]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/pubsub: fix tests to sync from master 33049/head
authorYuval Lifshitz <yuvalif@yahoo.com>
Sun, 2 Feb 2020 19:03:25 +0000 (21:03 +0200)
committerYuval Lifshitz <yuvalif@yahoo.com>
Sun, 2 Feb 2020 19:03:25 +0000 (21:03 +0200)
Signed-off-by: Yuval Lifshitz <yuvalif@yahoo.com>
src/test/rgw/rgw_multi/tests.py
src/test/rgw/rgw_multi/tests_ps.py

index 911eaf0dcacc7b8eed0db54bd2462a5f40d1ba00..4ff642e68fe12a95256a2eb0b608ee506a732987 100644 (file)
@@ -354,7 +354,7 @@ def compare_bucket_status(target_zone, source_zone, bucket_name, log_status, syn
     return True
 
 def zone_data_checkpoint(target_zone, source_zone):
-    if not target_zone.syncs_from(source_zone):
+    if not target_zone.syncs_from(source_zone.name):
         return
 
     log_status = data_source_log_status(source_zone)
@@ -384,7 +384,7 @@ def zonegroup_data_checkpoint(zonegroup_conns):
             zone_data_checkpoint(target_conn.zone, source_conn.zone)
 
 def zone_bucket_checkpoint(target_zone, source_zone, bucket_name):
-    if not target_zone.syncs_from(source_zone):
+    if not target_zone.syncs_from(source_zone.name):
         return
 
     log_status = bucket_source_log_status(source_zone, bucket_name)
index 4eb5bb8914018eb89b0d8ddd294595dc01c6c42f..05f1d797ac29eaeef049c32eab13f4e4b3ad577a 100644 (file)
@@ -568,19 +568,19 @@ def init_env(require_ps=True):
 
     zonegroup_meta_checkpoint(zonegroup)
 
-    ps_zones = []
-    zones = []
+    ps_zone = None
+    master_zone = None
     for conn in zonegroup_conns.zones:
+        if conn.zone == zonegroup.master_zone:
+            master_zone = conn
         if is_ps_zone(conn):
             zone_meta_checkpoint(conn.zone)
-            ps_zones.append(conn)
-        elif not conn.zone.is_read_only():
-            zones.append(conn)
+            ps_zone = conn
 
-    assert_not_equal(len(zones), 0)
+    assert_not_equal(master_zone, None)
     if require_ps:
-        assert_not_equal(len(ps_zones), 0)
-    return zones, ps_zones
+        assert_not_equal(ps_zone, None)
+    return master_zone, ps_zone
 
 
 def get_ip():
@@ -608,12 +608,12 @@ NOTIFICATION_SUFFIX = "_notif"
 def test_ps_info():
     """ log information for manual testing """
     return SkipTest("only used in manual testing")
-    zones, ps_zones = init_env()
+    master_zone, ps_zone = init_env()
     realm = get_realm()
     zonegroup = realm.master_zonegroup()
     bucket_name = gen_bucket_name()
     # create bucket on the first of the rados zones
-    bucket = zones[0].create_bucket(bucket_name)
+    bucket = master_zone.create_bucket(bucket_name)
     # create objects in the bucket
     number_of_objects = 10
     for i in range(number_of_objects):
@@ -623,23 +623,23 @@ def test_ps_info():
     print('user: ' + get_user())
     print('tenant: ' + get_tenant())
     print('Master Zone')
-    print_connection_info(zones[0].conn)
+    print_connection_info(master_zone.conn)
     print('PubSub Zone')
-    print_connection_info(ps_zones[0].conn)
+    print_connection_info(ps_zone.conn)
     print('Bucket: ' + bucket_name)
 
 
 def test_ps_s3_notification_low_level():
     """ test low level implementation of s3 notifications """
-    zones, ps_zones = init_env()
+    master_zone, ps_zone = init_env()
     bucket_name = gen_bucket_name()
     # create bucket on the first of the rados zones
-    zones[0].create_bucket(bucket_name)
+    master_zone.create_bucket(bucket_name)
     # wait for sync
-    zone_meta_checkpoint(ps_zones[0].zone)
+    zone_meta_checkpoint(ps_zone.zone)
     # create topic
     topic_name = bucket_name + TOPIC_SUFFIX
-    topic_conf = PSTopic(ps_zones[0].conn, topic_name)
+    topic_conf = PSTopic(ps_zone.conn, topic_name)
     result, status = topic_conf.set_config()
     assert_equal(status/100, 2)
     parsed_result = json.loads(result)
@@ -651,25 +651,25 @@ def test_ps_s3_notification_low_level():
                         'TopicArn': topic_arn,
                         'Events': ['s3:ObjectCreated:*']
                        }]
-    s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
+    s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
     _, status = s3_notification_conf.set_config()
     assert_equal(status/100, 2)
-    zone_meta_checkpoint(ps_zones[0].zone)
+    zone_meta_checkpoint(ps_zone.zone)
     # get auto-generated topic
-    generated_topic_conf = PSTopic(ps_zones[0].conn, generated_topic_name)
+    generated_topic_conf = PSTopic(ps_zone.conn, generated_topic_name)
     result, status = generated_topic_conf.get_config()
     parsed_result = json.loads(result)
     assert_equal(status/100, 2)
     assert_equal(parsed_result['topic']['name'], generated_topic_name)
     # get auto-generated notification
-    notification_conf = PSNotification(ps_zones[0].conn, bucket_name,
+    notification_conf = PSNotification(ps_zone.conn, bucket_name,
                                        generated_topic_name)
     result, status = notification_conf.get_config()
     parsed_result = json.loads(result)
     assert_equal(status/100, 2)
     assert_equal(len(parsed_result['topics']), 1)
     # get auto-generated subscription
-    sub_conf = PSSubscription(ps_zones[0].conn, notification_name,
+    sub_conf = PSSubscription(ps_zone.conn, notification_name,
                               generated_topic_name)
     result, status = sub_conf.get_config()
     parsed_result = json.loads(result)
@@ -699,20 +699,20 @@ def test_ps_s3_notification_low_level():
     # cleanup
     topic_conf.del_config()
     # delete the bucket
-    zones[0].delete_bucket(bucket_name)
+    master_zone.delete_bucket(bucket_name)
 
 
 def test_ps_s3_notification_records():
     """ test s3 records fetching """
-    zones, ps_zones = init_env()
+    master_zone, ps_zone = init_env()
     bucket_name = gen_bucket_name()
     # create bucket on the first of the rados zones
-    bucket = zones[0].create_bucket(bucket_name)
+    bucket = master_zone.create_bucket(bucket_name)
     # wait for sync
-    zone_meta_checkpoint(ps_zones[0].zone)
+    zone_meta_checkpoint(ps_zone.zone)
     # create topic
     topic_name = bucket_name + TOPIC_SUFFIX
-    topic_conf = PSTopic(ps_zones[0].conn, topic_name)
+    topic_conf = PSTopic(ps_zone.conn, topic_name)
     result, status = topic_conf.set_config()
     assert_equal(status/100, 2)
     parsed_result = json.loads(result)
@@ -723,12 +723,12 @@ def test_ps_s3_notification_records():
                         'TopicArn': topic_arn,
                         'Events': ['s3:ObjectCreated:*']
                        }]
-    s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
+    s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
     _, status = s3_notification_conf.set_config()
     assert_equal(status/100, 2)
-    zone_meta_checkpoint(ps_zones[0].zone)
+    zone_meta_checkpoint(ps_zone.zone)
     # get auto-generated subscription
-    sub_conf = PSSubscription(ps_zones[0].conn, notification_name,
+    sub_conf = PSSubscription(ps_zone.conn, notification_name,
                               topic_name)
     _, status = sub_conf.get_config()
     assert_equal(status/100, 2)
@@ -738,7 +738,7 @@ def test_ps_s3_notification_records():
         key = bucket.new_key(str(i))
         key.set_contents_from_string('bar')
     # wait for sync
-    zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
+    zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
 
     # get the events from the subscription
     result, _ = sub_conf.get_events()
@@ -755,21 +755,21 @@ def test_ps_s3_notification_records():
     # delete the keys
     for key in bucket.list():
         key.delete()
-    zones[0].delete_bucket(bucket_name)
+    master_zone.delete_bucket(bucket_name)
 
 
 def test_ps_s3_notification():
     """ test s3 notification set/get/delete """
-    zones, ps_zones = init_env()
+    master_zone, ps_zone = init_env()
     bucket_name = gen_bucket_name()
     # create bucket on the first of the rados zones
-    zones[0].create_bucket(bucket_name)
+    master_zone.create_bucket(bucket_name)
     # wait for sync
-    zone_meta_checkpoint(ps_zones[0].zone)
+    zone_meta_checkpoint(ps_zone.zone)
     topic_name = bucket_name + TOPIC_SUFFIX
     # create topic
     topic_name = bucket_name + TOPIC_SUFFIX
-    topic_conf = PSTopic(ps_zones[0].conn, topic_name)
+    topic_conf = PSTopic(ps_zone.conn, topic_name)
     response, status = topic_conf.set_config()
     assert_equal(status/100, 2)
     parsed_result = json.loads(response)
@@ -780,7 +780,7 @@ def test_ps_s3_notification():
                         'TopicArn': topic_arn,
                         'Events': ['s3:ObjectCreated:*']
                        }]
-    s3_notification_conf1 = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
+    s3_notification_conf1 = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
     response, status = s3_notification_conf1.set_config()
     assert_equal(status/100, 2)
     # create another s3 notification with the same topic
@@ -789,10 +789,10 @@ def test_ps_s3_notification():
                         'TopicArn': topic_arn,
                         'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*']
                        }]
-    s3_notification_conf2 = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
+    s3_notification_conf2 = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
     response, status = s3_notification_conf2.set_config()
     assert_equal(status/100, 2)
-    zone_meta_checkpoint(ps_zones[0].zone)
+    zone_meta_checkpoint(ps_zone.zone)
 
     # get all notification on a bucket
     response, status = s3_notification_conf1.get_config()
@@ -820,37 +820,37 @@ def test_ps_s3_notification():
     # cleanup
     topic_conf.del_config()
     # delete the bucket
-    zones[0].delete_bucket(bucket_name)
+    master_zone.delete_bucket(bucket_name)
 
 
 def test_ps_s3_topic_on_master():
     """ test s3 topics set/get/delete on master """
-    zones, _  = init_env(require_ps=False)
+    master_zone, _ = init_env(require_ps=False)
     realm = get_realm()
     zonegroup = realm.master_zonegroup()
     bucket_name = gen_bucket_name()
     topic_name = bucket_name + TOPIC_SUFFIX
 
     # clean all topics
-    delete_all_s3_topics(zones[0], zonegroup.name)
+    delete_all_s3_topics(master_zone, zonegroup.name)
    
     # create s3 topics
     endpoint_address = 'amqp://127.0.0.1:7001'
     endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
-    topic_conf1 = PSTopicS3(zones[0].conn, topic_name+'_1', zonegroup.name, endpoint_args=endpoint_args)
+    topic_conf1 = PSTopicS3(master_zone.conn, topic_name+'_1', zonegroup.name, endpoint_args=endpoint_args)
     topic_arn = topic_conf1.set_config()
     assert_equal(topic_arn,
                  'arn:aws:sns:' + zonegroup.name + ':' + get_tenant() + ':' + topic_name + '_1')
 
     endpoint_address = 'http://127.0.0.1:9001'
     endpoint_args = 'push-endpoint='+endpoint_address
-    topic_conf2 = PSTopicS3(zones[0].conn, topic_name+'_2', zonegroup.name, endpoint_args=endpoint_args)
+    topic_conf2 = PSTopicS3(master_zone.conn, topic_name+'_2', zonegroup.name, endpoint_args=endpoint_args)
     topic_arn = topic_conf2.set_config()
     assert_equal(topic_arn,
                  'arn:aws:sns:' + zonegroup.name + ':' + get_tenant() + ':' + topic_name + '_2')
     endpoint_address = 'http://127.0.0.1:9002'
     endpoint_args = 'push-endpoint='+endpoint_address
-    topic_conf3 = PSTopicS3(zones[0].conn, topic_name+'_3', zonegroup.name, endpoint_args=endpoint_args)
+    topic_conf3 = PSTopicS3(master_zone.conn, topic_name+'_3', zonegroup.name, endpoint_args=endpoint_args)
     topic_arn = topic_conf3.set_config()
     assert_equal(topic_arn,
                  'arn:aws:sns:' + zonegroup.name + ':' + get_tenant() + ':' + topic_name + '_3')
@@ -890,8 +890,8 @@ def test_ps_s3_topic_on_master():
 
 def test_ps_s3_topic_with_secret_on_master():
     """ test s3 topics with secret set/get/delete on master """
-    zones, _  = init_env(require_ps=False)
-    if zones[0].secure_conn is None:
+    master_zone, _ = init_env(require_ps=False)
+    if master_zone.secure_conn is None:
         return SkipTest('secure connection is needed to test topic with secrets')
 
     realm = get_realm()
@@ -900,12 +900,12 @@ def test_ps_s3_topic_with_secret_on_master():
     topic_name = bucket_name + TOPIC_SUFFIX
 
     # clean all topics
-    delete_all_s3_topics(zones[0], zonegroup.name)
+    delete_all_s3_topics(master_zone, zonegroup.name)
    
     # create s3 topics
     endpoint_address = 'amqp://user:password@127.0.0.1:7001'
     endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
-    bad_topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
+    bad_topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
     try:
         result = bad_topic_conf.set_config()
     except Exception as err:
@@ -913,7 +913,7 @@ def test_ps_s3_topic_with_secret_on_master():
     else:
         assert False, 'user password configuration set allowed only over HTTPS'
     
-    topic_conf = PSTopicS3(zones[0].secure_conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
+    topic_conf = PSTopicS3(master_zone.secure_conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
     topic_arn = topic_conf.set_config()
 
     assert_equal(topic_arn,
@@ -940,17 +940,17 @@ def test_ps_s3_topic_with_secret_on_master():
 
 def test_ps_s3_notification_on_master():
     """ test s3 notification set/get/delete on master """
-    zones, _  = init_env(require_ps=False)
+    master_zone, _  = init_env(require_ps=False)
     realm = get_realm()
     zonegroup = realm.master_zonegroup()
     bucket_name = gen_bucket_name()
     # create bucket
-    bucket = zones[0].create_bucket(bucket_name)
+    bucket = master_zone.create_bucket(bucket_name)
     topic_name = bucket_name + TOPIC_SUFFIX
     # create s3 topic
     endpoint_address = 'amqp://127.0.0.1:7001'
     endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
-    topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
+    topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
     topic_arn = topic_conf.set_config()
     # create s3 notification
     notification_name = bucket_name + NOTIFICATION_SUFFIX
@@ -966,7 +966,7 @@ def test_ps_s3_notification_on_master():
                         'TopicArn': topic_arn,
                         'Events': []
                        }]
-    s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
+    s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
     _, status = s3_notification_conf.set_config()
     assert_equal(status/100, 2)
 
@@ -996,7 +996,7 @@ def test_ps_s3_notification_on_master():
     # cleanup
     topic_conf.del_config()
     # delete the bucket
-    zones[0].delete_bucket(bucket_name)
+    master_zone.delete_bucket(bucket_name)
 
 
 def ps_s3_notification_filter(on_master):
@@ -1008,18 +1008,18 @@ def ps_s3_notification_filter(on_master):
     if proc is  None:
         return SkipTest('end2end amqp tests require rabbitmq-server installed')
     if on_master:
-        zones, _  = init_env(require_ps=False)
-        ps_zone = zones[0]
+        master_zone, _ = init_env(require_ps=False)
+        ps_zone = master_zone
     else:
-        zones, ps_zones  = init_env(require_ps=True)
-        ps_zone = ps_zones[0]
+        master_zone, ps_zone = init_env(require_ps=True)
+        ps_zone = ps_zone
 
     realm = get_realm()
     zonegroup = realm.master_zonegroup()
     
     # create bucket
     bucket_name = gen_bucket_name()
-    bucket = zones[0].create_bucket(bucket_name)
+    bucket = master_zone.create_bucket(bucket_name)
     topic_name = bucket_name + TOPIC_SUFFIX
 
     # start amqp receivers
@@ -1153,7 +1153,7 @@ def ps_s3_notification_filter(on_master):
         print('wait for 5sec for the messages...')
         time.sleep(5)
     else:
-        zone_bucket_checkpoint(ps_zone.zone, zones[0].zone, bucket_name)
+        zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
 
     found_in1 = []
     found_in2 = []
@@ -1188,7 +1188,7 @@ def ps_s3_notification_filter(on_master):
     # delete the bucket
     for key in bucket.list():
         key.delete()
-    zones[0].delete_bucket(bucket_name)
+    master_zone.delete_bucket(bucket_name)
     stop_amqp_receiver(receiver, task)
     clean_rabbitmq(proc)
 
@@ -1203,17 +1203,17 @@ def test_ps_s3_notification_filter():
 
 def test_ps_s3_notification_errors_on_master():
     """ test s3 notification set/get/delete on master """
-    zones, _  = init_env(require_ps=False)
+    master_zone, _ = init_env(require_ps=False)
     realm = get_realm()
     zonegroup = realm.master_zonegroup()
     bucket_name = gen_bucket_name()
     # create bucket
-    bucket = zones[0].create_bucket(bucket_name)
+    bucket = master_zone.create_bucket(bucket_name)
     topic_name = bucket_name + TOPIC_SUFFIX
     # create s3 topic
     endpoint_address = 'amqp://127.0.0.1:7001'
     endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
-    topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
+    topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
     topic_arn = topic_conf.set_config()
 
     # create s3 notification with invalid event name
@@ -1222,7 +1222,7 @@ def test_ps_s3_notification_errors_on_master():
                         'TopicArn': topic_arn,
                         'Events': ['s3:ObjectCreated:Kaboom']
                        }]
-    s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
+    s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
     try:
       result, status = s3_notification_conf.set_config()
     except Exception as error:
@@ -1235,7 +1235,7 @@ def test_ps_s3_notification_errors_on_master():
                         'TopicArn': topic_arn,
                         'Events': ['s3:ObjectCreated:Put']
                        }]
-    s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
+    s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
     try:
       _, _ = s3_notification_conf.set_config()
     except Exception as error:
@@ -1249,7 +1249,7 @@ def test_ps_s3_notification_errors_on_master():
                         'TopicArn': invalid_topic_arn,
                         'Events': ['s3:ObjectCreated:Put']
                        }]
-    s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
+    s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
     try:
       _, _ = s3_notification_conf.set_config()
     except Exception as error:
@@ -1263,7 +1263,7 @@ def test_ps_s3_notification_errors_on_master():
                         'TopicArn': invalid_topic_arn ,
                         'Events': ['s3:ObjectCreated:Put']
                        }]
-    s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
+    s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
     try:
       _, _ = s3_notification_conf.set_config()
     except Exception as error:
@@ -1276,7 +1276,7 @@ def test_ps_s3_notification_errors_on_master():
                         'TopicArn': topic_arn,
                         'Events': ['s3:ObjectCreated:Put']
                        }]
-    s3_notification_conf = PSNotificationS3(zones[0].conn, 'kaboom', topic_conf_list)
+    s3_notification_conf = PSNotificationS3(master_zone.conn, 'kaboom', topic_conf_list)
     try:
       _, _ = s3_notification_conf.set_config()
     except Exception as error:
@@ -1295,16 +1295,16 @@ def test_ps_s3_notification_errors_on_master():
     
     # cleanup
     # delete the bucket
-    zones[0].delete_bucket(bucket_name)
+    master_zone.delete_bucket(bucket_name)
 
 
 def test_objcet_timing():
     return SkipTest("only used in manual testing")
-    zones, _  = init_env(require_ps=False)
+    master_zone, _ = init_env(require_ps=False)
     
     # create bucket
     bucket_name = gen_bucket_name()
-    bucket = zones[0].create_bucket(bucket_name)
+    bucket = master_zone.create_bucket(bucket_name)
     # create objects in the bucket (async)
     print('creating objects...')
     number_of_objects = 1000
@@ -1336,7 +1336,7 @@ def test_objcet_timing():
     print('average time for object deletion: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
     
     # cleanup
-    zones[0].delete_bucket(bucket_name)
+    master_zone.delete_bucket(bucket_name)
 
 
 def test_ps_s3_notification_push_amqp_on_master():
@@ -1347,13 +1347,13 @@ def test_ps_s3_notification_push_amqp_on_master():
     proc = init_rabbitmq()
     if proc is  None:
         return SkipTest('end2end amqp tests require rabbitmq-server installed')
-    zones, _  = init_env(require_ps=False)
+    master_zone, _ = init_env(require_ps=False)
     realm = get_realm()
     zonegroup = realm.master_zonegroup()
     
     # create bucket
     bucket_name = gen_bucket_name()
-    bucket = zones[0].create_bucket(bucket_name)
+    bucket = master_zone.create_bucket(bucket_name)
     topic_name1 = bucket_name + TOPIC_SUFFIX + '_1'
     topic_name2 = bucket_name + TOPIC_SUFFIX + '_2'
 
@@ -1368,11 +1368,11 @@ def test_ps_s3_notification_push_amqp_on_master():
     endpoint_address = 'amqp://' + hostname
     # with acks from broker
     endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker'
-    topic_conf1 = PSTopicS3(zones[0].conn, topic_name1, zonegroup.name, endpoint_args=endpoint_args)
+    topic_conf1 = PSTopicS3(master_zone.conn, topic_name1, zonegroup.name, endpoint_args=endpoint_args)
     topic_arn1 = topic_conf1.set_config()
     # without acks from broker
     endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=none'
-    topic_conf2 = PSTopicS3(zones[0].conn, topic_name2, zonegroup.name, endpoint_args=endpoint_args)
+    topic_conf2 = PSTopicS3(master_zone.conn, topic_name2, zonegroup.name, endpoint_args=endpoint_args)
     topic_arn2 = topic_conf2.set_config()
     # create s3 notification
     notification_name = bucket_name + NOTIFICATION_SUFFIX
@@ -1383,7 +1383,7 @@ def test_ps_s3_notification_push_amqp_on_master():
                          'Events': ['s3:ObjectCreated:*']
                        }]
 
-    s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
+    s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
     response, status = s3_notification_conf.set_config()
     assert_equal(status/100, 2)
 
@@ -1444,7 +1444,7 @@ def test_ps_s3_notification_push_amqp_on_master():
     topic_conf1.del_config()
     topic_conf2.del_config()
     # delete the bucket
-    zones[0].delete_bucket(bucket_name)
+    master_zone.delete_bucket(bucket_name)
     clean_rabbitmq(proc)
 
 
@@ -1456,15 +1456,15 @@ def test_ps_s3_notification_push_kafka():
     if kafka_proc is  None or zk_proc is None:
         return SkipTest('end2end kafka tests require kafka/zookeeper installed')
 
-    zones, ps_zones  = init_env(require_ps=True)
+    master_zone, ps_zone = init_env()
     realm = get_realm()
     zonegroup = realm.master_zonegroup()
     
     # create bucket
     bucket_name = gen_bucket_name()
-    bucket = zones[0].create_bucket(bucket_name)
+    bucket = master_zone.create_bucket(bucket_name)
     # wait for sync
-    zone_meta_checkpoint(ps_zones[0].zone)
+    zone_meta_checkpoint(ps_zone.zone)
     # name is constant for manual testing
     topic_name = bucket_name+'_topic'
     # create consumer on the topic
@@ -1472,7 +1472,7 @@ def test_ps_s3_notification_push_kafka():
     task.start()
 
     # create topic
-    topic_conf = PSTopic(ps_zones[0].conn, topic_name,
+    topic_conf = PSTopic(ps_zone.conn, topic_name,
                          endpoint='kafka://' + kafka_server,
                          endpoint_args='kafka-ack-level=broker')
     result, status = topic_conf.set_config()
@@ -1485,7 +1485,7 @@ def test_ps_s3_notification_push_kafka():
                          'Events': []
                        }]
 
-    s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
+    s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
     response, status = s3_notification_conf.set_config()
     assert_equal(status/100, 2)
 
@@ -1501,7 +1501,7 @@ def test_ps_s3_notification_push_kafka():
     [thr.join() for thr in client_threads] 
 
     # wait for sync
-    zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
+    zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
     keys = list(bucket.list())
     receiver.verify_s3_events(keys, exact_match=True)
 
@@ -1513,14 +1513,14 @@ def test_ps_s3_notification_push_kafka():
         client_threads.append(thr)
     [thr.join() for thr in client_threads] 
     
-    zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
+    zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
     receiver.verify_s3_events(keys, exact_match=True, deletions=True)
     
     # cleanup
     s3_notification_conf.del_config()
     topic_conf.del_config()
     # delete the bucket
-    zones[0].delete_bucket(bucket_name)
+    master_zone.delete_bucket(bucket_name)
     stop_kafka_receiver(receiver, task)
     clean_kafka(kafka_proc, zk_proc, kafka_log)
 
@@ -1532,13 +1532,13 @@ def test_ps_s3_notification_push_kafka_on_master():
     kafka_proc, zk_proc, kafka_log = init_kafka()
     if kafka_proc is None or zk_proc is None:
         return SkipTest('end2end kafka tests require kafka/zookeeper installed')
-    zones, _  = init_env(require_ps=False)
+    master_zone, _ = init_env(require_ps=False)
     realm = get_realm()
     zonegroup = realm.master_zonegroup()
     
     # create bucket
     bucket_name = gen_bucket_name()
-    bucket = zones[0].create_bucket(bucket_name)
+    bucket = master_zone.create_bucket(bucket_name)
     # name is constant for manual testing
     topic_name = bucket_name+'_topic'
     # create consumer on the topic
@@ -1549,10 +1549,10 @@ def test_ps_s3_notification_push_kafka_on_master():
     endpoint_address = 'kafka://' + kafka_server
     # without acks from broker
     endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker'
-    topic_conf1 = PSTopicS3(zones[0].conn, topic_name+'_1', zonegroup.name, endpoint_args=endpoint_args)
+    topic_conf1 = PSTopicS3(master_zone.conn, topic_name+'_1', zonegroup.name, endpoint_args=endpoint_args)
     topic_arn1 = topic_conf1.set_config()
     endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=none'
-    topic_conf2 = PSTopicS3(zones[0].conn, topic_name+'_2', zonegroup.name, endpoint_args=endpoint_args)
+    topic_conf2 = PSTopicS3(master_zone.conn, topic_name+'_2', zonegroup.name, endpoint_args=endpoint_args)
     topic_arn2 = topic_conf2.set_config()
     # create s3 notification
     notification_name = bucket_name + NOTIFICATION_SUFFIX
@@ -1563,7 +1563,7 @@ def test_ps_s3_notification_push_kafka_on_master():
                          'Events': []
                        }]
 
-    s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
+    s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
     response, status = s3_notification_conf.set_config()
     assert_equal(status/100, 2)
 
@@ -1608,7 +1608,7 @@ def test_ps_s3_notification_push_kafka_on_master():
     topic_conf1.del_config()
     topic_conf2.del_config()
     # delete the bucket
-    zones[0].delete_bucket(bucket_name)
+    master_zone.delete_bucket(bucket_name)
     stop_kafka_receiver(receiver, task)
     clean_kafka(kafka_proc, zk_proc, kafka_log)
 
@@ -1617,8 +1617,8 @@ def kafka_security(security_type):
     """ test pushing kafka s3 notification on master """
     if skip_push_tests:
         return SkipTest("PubSub push tests don't run in teuthology")
-    zones, _  = init_env(require_ps=False)
-    if security_type == 'SSL_SASL' and zones[0].secure_conn is None:
+    master_zone, _ = init_env(require_ps=False)
+    if security_type == 'SSL_SASL' and master_zone.secure_conn is None:
         return SkipTest("secure connection is needed to test SASL_SSL security")
     kafka_proc, zk_proc, kafka_log = init_kafka()
     if kafka_proc is None or zk_proc is None:
@@ -1628,7 +1628,7 @@ def kafka_security(security_type):
     
     # create bucket
     bucket_name = gen_bucket_name()
-    bucket = zones[0].create_bucket(bucket_name)
+    bucket = master_zone.create_bucket(bucket_name)
     # name is constant for manual testing
     topic_name = bucket_name+'_topic'
     # create consumer on the topic
@@ -1648,9 +1648,9 @@ def kafka_security(security_type):
     endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=none&use-ssl=true&ca-location='+KAFKA_DIR+'rootCA.crt'
 
     if security_type == 'SSL_SASL':
-        topic_conf = PSTopicS3(zones[0].secure_conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
+        topic_conf = PSTopicS3(master_zone.secure_conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
     else:
-        topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
+        topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
 
     topic_arn = topic_conf.set_config()
     # create s3 notification
@@ -1659,7 +1659,7 @@ def kafka_security(security_type):
                          'Events': []
                        }]
 
-    s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
+    s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
     s3_notification_conf.set_config()
 
     # create objects in the bucket (async)
@@ -1707,7 +1707,7 @@ def kafka_security(security_type):
         # delete the bucket
         for key in bucket.list():
             key.delete()
-        zones[0].delete_bucket(bucket_name)
+        master_zone.delete_bucket(bucket_name)
         stop_kafka_receiver(receiver, task)
         clean_kafka(kafka_proc, zk_proc, kafka_log)
 
@@ -1724,7 +1724,7 @@ def test_ps_s3_notification_push_http_on_master():
     if skip_push_tests:
         return SkipTest("PubSub push tests don't run in teuthology")
     hostname = get_ip()
-    zones, _  = init_env(require_ps=False)
+    master_zone, _ = init_env(require_ps=False)
     realm = get_realm()
     zonegroup = realm.master_zonegroup()
     
@@ -1737,13 +1737,13 @@ def test_ps_s3_notification_push_http_on_master():
     
     # create bucket
     bucket_name = gen_bucket_name()
-    bucket = zones[0].create_bucket(bucket_name)
+    bucket = master_zone.create_bucket(bucket_name)
     topic_name = bucket_name + TOPIC_SUFFIX
 
     # create s3 topic
     endpoint_address = 'http://'+host+':'+str(port)
     endpoint_args = 'push-endpoint='+endpoint_address
-    topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
+    topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
     topic_arn = topic_conf.set_config()
     # create s3 notification
     notification_name = bucket_name + NOTIFICATION_SUFFIX
@@ -1751,7 +1751,7 @@ def test_ps_s3_notification_push_http_on_master():
                         'TopicArn': topic_arn,
                         'Events': []
                        }]
-    s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
+    s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
     response, status = s3_notification_conf.set_config()
     assert_equal(status/100, 2)
 
@@ -1799,7 +1799,7 @@ def test_ps_s3_notification_push_http_on_master():
     topic_conf.del_config()
     s3_notification_conf.del_config(notification=notification_name)
     # delete the bucket
-    zones[0].delete_bucket(bucket_name)
+    master_zone.delete_bucket(bucket_name)
     http_server.close()
 
 
@@ -1808,7 +1808,7 @@ def test_ps_s3_opaque_data():
     if skip_push_tests:
         return SkipTest("PubSub push tests don't run in teuthology")
     hostname = get_ip()
-    zones, ps_zones  = init_env(require_ps=True)
+    master_zone, ps_zone = init_env()
     realm = get_realm()
     zonegroup = realm.master_zonegroup()
     
@@ -1821,16 +1821,16 @@ def test_ps_s3_opaque_data():
     
     # create bucket
     bucket_name = gen_bucket_name()
-    bucket = zones[0].create_bucket(bucket_name)
+    bucket = master_zone.create_bucket(bucket_name)
     topic_name = bucket_name + TOPIC_SUFFIX
     # wait for sync
-    zone_meta_checkpoint(ps_zones[0].zone)
+    zone_meta_checkpoint(ps_zone.zone)
 
     # create s3 topic
     endpoint_address = 'http://'+host+':'+str(port)
     opaque_data = 'http://1.2.3.4:8888'
     endpoint_args = 'push-endpoint='+endpoint_address+'&OpaqueData='+opaque_data
-    topic_conf = PSTopic(ps_zones[0].conn, topic_name, endpoint=endpoint_address, endpoint_args=endpoint_args)
+    topic_conf = PSTopic(ps_zone.conn, topic_name, endpoint=endpoint_address, endpoint_args=endpoint_args)
     result, status = topic_conf.set_config()
     assert_equal(status/100, 2)
     parsed_result = json.loads(result)
@@ -1841,7 +1841,7 @@ def test_ps_s3_opaque_data():
                         'TopicArn': topic_arn,
                         'Events': []
                        }]
-    s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
+    s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
     response, status = s3_notification_conf.set_config()
     assert_equal(status/100, 2)
 
@@ -1856,7 +1856,7 @@ def test_ps_s3_opaque_data():
     [thr.join() for thr in client_threads] 
 
     # wait for sync
-    zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
+    zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
 
     # check http receiver
     keys = list(bucket.list())
@@ -1872,7 +1872,7 @@ def test_ps_s3_opaque_data():
     topic_conf.del_config()
     s3_notification_conf.del_config(notification=notification_name)
     # delete the bucket
-    zones[0].delete_bucket(bucket_name)
+    master_zone.delete_bucket(bucket_name)
     http_server.close()
 
 
@@ -1881,7 +1881,7 @@ def test_ps_s3_opaque_data_on_master():
     if skip_push_tests:
         return SkipTest("PubSub push tests don't run in teuthology")
     hostname = get_ip()
-    zones, _  = init_env(require_ps=False)
+    master_zone, _ = init_env(require_ps=False)
     realm = get_realm()
     zonegroup = realm.master_zonegroup()
     
@@ -1894,14 +1894,14 @@ def test_ps_s3_opaque_data_on_master():
     
     # create bucket
     bucket_name = gen_bucket_name()
-    bucket = zones[0].create_bucket(bucket_name)
+    bucket = master_zone.create_bucket(bucket_name)
     topic_name = bucket_name + TOPIC_SUFFIX
 
     # create s3 topic
     endpoint_address = 'http://'+host+':'+str(port)
     endpoint_args = 'push-endpoint='+endpoint_address
     opaque_data = 'http://1.2.3.4:8888'
-    topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name, endpoint_args=endpoint_args, opaque_data=opaque_data)
+    topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args, opaque_data=opaque_data)
     topic_arn = topic_conf.set_config()
     # create s3 notification
     notification_name = bucket_name + NOTIFICATION_SUFFIX
@@ -1909,7 +1909,7 @@ def test_ps_s3_opaque_data_on_master():
                         'TopicArn': topic_arn,
                         'Events': []
                        }]
-    s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
+    s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
     response, status = s3_notification_conf.set_config()
     assert_equal(status/100, 2)
 
@@ -1944,19 +1944,19 @@ def test_ps_s3_opaque_data_on_master():
     topic_conf.del_config()
     s3_notification_conf.del_config(notification=notification_name)
     # delete the bucket
-    zones[0].delete_bucket(bucket_name)
+    master_zone.delete_bucket(bucket_name)
     http_server.close()
 
 def test_ps_topic():
     """ test set/get/delete of topic """
-    _, ps_zones = init_env()
+    _, ps_zone = init_env()
     realm = get_realm()
     zonegroup = realm.master_zonegroup()
     bucket_name = gen_bucket_name()
     topic_name = bucket_name+TOPIC_SUFFIX
 
     # create topic
-    topic_conf = PSTopic(ps_zones[0].conn, topic_name)
+    topic_conf = PSTopic(ps_zone.conn, topic_name)
     _, status = topic_conf.set_config()
     assert_equal(status/100, 2)
     # get topic
@@ -1979,14 +1979,14 @@ def test_ps_topic():
 
 def test_ps_topic_with_endpoint():
     """ test set topic with endpoint"""
-    _, ps_zones = init_env()
+    _, ps_zone = init_env()
     bucket_name = gen_bucket_name()
     topic_name = bucket_name+TOPIC_SUFFIX
 
     # create topic
     dest_endpoint = 'amqp://localhost:7001'
     dest_args = 'amqp-exchange=amqp.direct&amqp-ack-level=none'
-    topic_conf = PSTopic(ps_zones[0].conn, topic_name,
+    topic_conf = PSTopic(ps_zone.conn, topic_name,
                          endpoint=dest_endpoint,
                          endpoint_args=dest_args)
     _, status = topic_conf.set_config()
@@ -2003,19 +2003,19 @@ def test_ps_topic_with_endpoint():
 
 def test_ps_notification():
     """ test set/get/delete of notification """
-    zones, ps_zones = init_env()
+    master_zone, ps_zone = init_env()
     bucket_name = gen_bucket_name()
     topic_name = bucket_name+TOPIC_SUFFIX
 
     # create topic
-    topic_conf = PSTopic(ps_zones[0].conn, topic_name)
+    topic_conf = PSTopic(ps_zone.conn, topic_name)
     topic_conf.set_config()
     # create bucket on the first of the rados zones
-    zones[0].create_bucket(bucket_name)
+    master_zone.create_bucket(bucket_name)
     # wait for sync
-    zone_meta_checkpoint(ps_zones[0].zone)
+    zone_meta_checkpoint(ps_zone.zone)
     # create notifications
-    notification_conf = PSNotification(ps_zones[0].conn, bucket_name,
+    notification_conf = PSNotification(ps_zone.conn, bucket_name,
                                        topic_name)
     _, status = notification_conf.set_config()
     assert_equal(status/100, 2)
@@ -2036,25 +2036,25 @@ def test_ps_notification():
 
     # cleanup
     topic_conf.del_config()
-    zones[0].delete_bucket(bucket_name)
+    master_zone.delete_bucket(bucket_name)
 
 
 def test_ps_notification_events():
     """ test set/get/delete of notification on specific events"""
-    zones, ps_zones = init_env()
+    master_zone, ps_zone = init_env()
     bucket_name = gen_bucket_name()
     topic_name = bucket_name+TOPIC_SUFFIX
 
     # create topic
-    topic_conf = PSTopic(ps_zones[0].conn, topic_name)
+    topic_conf = PSTopic(ps_zone.conn, topic_name)
     topic_conf.set_config()
     # create bucket on the first of the rados zones
-    zones[0].create_bucket(bucket_name)
+    master_zone.create_bucket(bucket_name)
     # wait for sync
-    zone_meta_checkpoint(ps_zones[0].zone)
+    zone_meta_checkpoint(ps_zone.zone)
     # create notifications
     events = "OBJECT_CREATE,OBJECT_DELETE"
-    notification_conf = PSNotification(ps_zones[0].conn, bucket_name,
+    notification_conf = PSNotification(ps_zone.conn, bucket_name,
                                        topic_name,
                                        events)
     _, status = notification_conf.set_config()
@@ -2071,29 +2071,29 @@ def test_ps_notification_events():
     # cleanup
     notification_conf.del_config()
     topic_conf.del_config()
-    zones[0].delete_bucket(bucket_name)
+    master_zone.delete_bucket(bucket_name)
 
 
 def test_ps_subscription():
     """ test set/get/delete of subscription """
-    zones, ps_zones = init_env()
+    master_zone, ps_zone = init_env()
     bucket_name = gen_bucket_name()
     topic_name = bucket_name+TOPIC_SUFFIX
 
     # create topic
-    topic_conf = PSTopic(ps_zones[0].conn, topic_name)
+    topic_conf = PSTopic(ps_zone.conn, topic_name)
     topic_conf.set_config()
     # create bucket on the first of the rados zones
-    bucket = zones[0].create_bucket(bucket_name)
+    bucket = master_zone.create_bucket(bucket_name)
     # wait for sync
-    zone_meta_checkpoint(ps_zones[0].zone)
+    zone_meta_checkpoint(ps_zone.zone)
     # create notifications
-    notification_conf = PSNotification(ps_zones[0].conn, bucket_name,
+    notification_conf = PSNotification(ps_zone.conn, bucket_name,
                                        topic_name)
     _, status = notification_conf.set_config()
     assert_equal(status/100, 2)
     # create subscription
-    sub_conf = PSSubscription(ps_zones[0].conn, bucket_name+SUB_SUFFIX,
+    sub_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX,
                               topic_name)
     _, status = sub_conf.set_config()
     assert_equal(status/100, 2)
@@ -2107,7 +2107,7 @@ def test_ps_subscription():
         key = bucket.new_key(str(i))
         key.set_contents_from_string('bar')
     # wait for sync
-    zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
+    zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
 
     # get the create events from the subscription
     result, _ = sub_conf.get_events()
@@ -2121,13 +2121,12 @@ def test_ps_subscription():
     for key in bucket.list():
         key.delete()
     # wait for sync
-    zone_meta_checkpoint(ps_zones[0].zone)
-    zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
+    zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
 
     # get the delete events from the subscriptions
-    result, _ = sub_conf.get_events()
-    for event in events['events']:
-        log.debug('Event: objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"')
+    #result, _ = sub_conf.get_events()
+    #for event in events['events']:
+    #    log.debug('Event: objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"')
     # TODO: check deletions
     # TODO: use exact match
     # verify_events_by_elements(events, keys, exact_match=False, deletions=True)
@@ -2144,58 +2143,58 @@ def test_ps_subscription():
     # cleanup
     notification_conf.del_config()
     topic_conf.del_config()
-    zones[0].delete_bucket(bucket_name)
+    master_zone.delete_bucket(bucket_name)
 
 
 def test_ps_event_type_subscription():
     """ test subscriptions for different events """
-    zones, ps_zones = init_env()
+    master_zone, ps_zone = init_env()
     bucket_name = gen_bucket_name()
 
     # create topic for objects creation
     topic_create_name = bucket_name+TOPIC_SUFFIX+'_create'
-    topic_create_conf = PSTopic(ps_zones[0].conn, topic_create_name)
+    topic_create_conf = PSTopic(ps_zone.conn, topic_create_name)
     topic_create_conf.set_config()
     # create topic for objects deletion
     topic_delete_name = bucket_name+TOPIC_SUFFIX+'_delete'
-    topic_delete_conf = PSTopic(ps_zones[0].conn, topic_delete_name)
+    topic_delete_conf = PSTopic(ps_zone.conn, topic_delete_name)
     topic_delete_conf.set_config()
     # create topic for all events
     topic_name = bucket_name+TOPIC_SUFFIX+'_all'
-    topic_conf = PSTopic(ps_zones[0].conn, topic_name)
+    topic_conf = PSTopic(ps_zone.conn, topic_name)
     topic_conf.set_config()
     # create bucket on the first of the rados zones
-    bucket = zones[0].create_bucket(bucket_name)
+    bucket = master_zone.create_bucket(bucket_name)
     # wait for sync
-    zone_meta_checkpoint(ps_zones[0].zone)
-    zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
+    zone_meta_checkpoint(ps_zone.zone)
+    zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
     # create notifications for objects creation
-    notification_create_conf = PSNotification(ps_zones[0].conn, bucket_name,
+    notification_create_conf = PSNotification(ps_zone.conn, bucket_name,
                                               topic_create_name, "OBJECT_CREATE")
     _, status = notification_create_conf.set_config()
     assert_equal(status/100, 2)
     # create notifications for objects deletion
-    notification_delete_conf = PSNotification(ps_zones[0].conn, bucket_name,
+    notification_delete_conf = PSNotification(ps_zone.conn, bucket_name,
                                               topic_delete_name, "OBJECT_DELETE")
     _, status = notification_delete_conf.set_config()
     assert_equal(status/100, 2)
     # create notifications for all events
-    notification_conf = PSNotification(ps_zones[0].conn, bucket_name,
+    notification_conf = PSNotification(ps_zone.conn, bucket_name,
                                        topic_name, "OBJECT_DELETE,OBJECT_CREATE")
     _, status = notification_conf.set_config()
     assert_equal(status/100, 2)
     # create subscription for objects creation
-    sub_create_conf = PSSubscription(ps_zones[0].conn, bucket_name+SUB_SUFFIX+'_create',
+    sub_create_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX+'_create',
                                      topic_create_name)
     _, status = sub_create_conf.set_config()
     assert_equal(status/100, 2)
     # create subscription for objects deletion
-    sub_delete_conf = PSSubscription(ps_zones[0].conn, bucket_name+SUB_SUFFIX+'_delete',
+    sub_delete_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX+'_delete',
                                      topic_delete_name)
     _, status = sub_delete_conf.set_config()
     assert_equal(status/100, 2)
     # create subscription for all events
-    sub_conf = PSSubscription(ps_zones[0].conn, bucket_name+SUB_SUFFIX+'_all',
+    sub_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX+'_all',
                               topic_name)
     _, status = sub_conf.set_config()
     assert_equal(status/100, 2)
@@ -2205,7 +2204,7 @@ def test_ps_event_type_subscription():
         key = bucket.new_key(str(i))
         key.set_contents_from_string('bar')
     # wait for sync
-    zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
+    zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
 
     # get the events from the creation subscription
     result, _ = sub_create_conf.get_events()
@@ -2235,7 +2234,7 @@ def test_ps_event_type_subscription():
     for key in bucket.list():
         key.delete()
     # wait for sync
-    zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
+    zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
     log.debug("Event (OBJECT_DELETE) synced")
 
     # get the events from the creations subscription
@@ -2283,29 +2282,29 @@ def test_ps_event_type_subscription():
     topic_create_conf.del_config()
     topic_delete_conf.del_config()
     topic_conf.del_config()
-    zones[0].delete_bucket(bucket_name)
+    master_zone.delete_bucket(bucket_name)
 
 
 def test_ps_event_fetching():
     """ test incremental fetching of events from a subscription """
-    zones, ps_zones = init_env()
+    master_zone, ps_zone = init_env()
     bucket_name = gen_bucket_name()
     topic_name = bucket_name+TOPIC_SUFFIX
 
     # create topic
-    topic_conf = PSTopic(ps_zones[0].conn, topic_name)
+    topic_conf = PSTopic(ps_zone.conn, topic_name)
     topic_conf.set_config()
     # create bucket on the first of the rados zones
-    bucket = zones[0].create_bucket(bucket_name)
+    bucket = master_zone.create_bucket(bucket_name)
     # wait for sync
-    zone_meta_checkpoint(ps_zones[0].zone)
+    zone_meta_checkpoint(ps_zone.zone)
     # create notifications
-    notification_conf = PSNotification(ps_zones[0].conn, bucket_name,
+    notification_conf = PSNotification(ps_zone.conn, bucket_name,
                                        topic_name)
     _, status = notification_conf.set_config()
     assert_equal(status/100, 2)
     # create subscription
-    sub_conf = PSSubscription(ps_zones[0].conn, bucket_name+SUB_SUFFIX,
+    sub_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX,
                               topic_name)
     _, status = sub_conf.set_config()
     assert_equal(status/100, 2)
@@ -2315,7 +2314,7 @@ def test_ps_event_fetching():
         key = bucket.new_key(str(i))
         key.set_contents_from_string('bar')
     # wait for sync
-    zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
+    zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
     max_events = 15
     total_events_count = 0
     next_marker = None
@@ -2341,29 +2340,29 @@ def test_ps_event_fetching():
     topic_conf.del_config()
     for key in bucket.list():
         key.delete()
-    zones[0].delete_bucket(bucket_name)
+    master_zone.delete_bucket(bucket_name)
 
 
 def test_ps_event_acking():
     """ test acking of some events in a subscription """
-    zones, ps_zones = init_env()
+    master_zone, ps_zone = init_env()
     bucket_name = gen_bucket_name()
     topic_name = bucket_name+TOPIC_SUFFIX
 
     # create topic
-    topic_conf = PSTopic(ps_zones[0].conn, topic_name)
+    topic_conf = PSTopic(ps_zone.conn, topic_name)
     topic_conf.set_config()
     # create bucket on the first of the rados zones
-    bucket = zones[0].create_bucket(bucket_name)
+    bucket = master_zone.create_bucket(bucket_name)
     # wait for sync
-    zone_meta_checkpoint(ps_zones[0].zone)
+    zone_meta_checkpoint(ps_zone.zone)
     # create notifications
-    notification_conf = PSNotification(ps_zones[0].conn, bucket_name,
+    notification_conf = PSNotification(ps_zone.conn, bucket_name,
                                        topic_name)
     _, status = notification_conf.set_config()
     assert_equal(status/100, 2)
     # create subscription
-    sub_conf = PSSubscription(ps_zones[0].conn, bucket_name+SUB_SUFFIX,
+    sub_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX,
                               topic_name)
     _, status = sub_conf.set_config()
     assert_equal(status/100, 2)
@@ -2373,7 +2372,7 @@ def test_ps_event_acking():
         key = bucket.new_key(str(i))
         key.set_contents_from_string('bar')
     # wait for sync
-    zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
+    zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
 
     # get the create events from the subscription
     result, _ = sub_conf.get_events()
@@ -2406,29 +2405,29 @@ def test_ps_event_acking():
     topic_conf.del_config()
     for key in bucket.list():
         key.delete()
-    zones[0].delete_bucket(bucket_name)
+    master_zone.delete_bucket(bucket_name)
 
 
 def test_ps_creation_triggers():
     """ test object creation notifications in using put/copy/post """
-    zones, ps_zones = init_env()
+    master_zone, ps_zone = init_env()
     bucket_name = gen_bucket_name()
     topic_name = bucket_name+TOPIC_SUFFIX
 
     # create topic
-    topic_conf = PSTopic(ps_zones[0].conn, topic_name)
+    topic_conf = PSTopic(ps_zone.conn, topic_name)
     topic_conf.set_config()
     # create bucket on the first of the rados zones
-    bucket = zones[0].create_bucket(bucket_name)
+    bucket = master_zone.create_bucket(bucket_name)
     # wait for sync
-    zone_meta_checkpoint(ps_zones[0].zone)
+    zone_meta_checkpoint(ps_zone.zone)
     # create notifications
-    notification_conf = PSNotification(ps_zones[0].conn, bucket_name,
+    notification_conf = PSNotification(ps_zone.conn, bucket_name,
                                        topic_name)
     _, status = notification_conf.set_config()
     assert_equal(status/100, 2)
     # create subscription
-    sub_conf = PSSubscription(ps_zones[0].conn, bucket_name+SUB_SUFFIX,
+    sub_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX,
                               topic_name)
     _, status = sub_conf.set_config()
     assert_equal(status/100, 2)
@@ -2447,7 +2446,7 @@ def test_ps_creation_triggers():
     uploader.complete_upload()
     fp.close()
     # wait for sync
-    zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
+    zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
 
     # get the create events from the subscription
     result, _ = sub_conf.get_events()
@@ -2463,7 +2462,7 @@ def test_ps_creation_triggers():
     topic_conf.del_config()
     for key in bucket.list():
         key.delete()
-    zones[0].delete_bucket(bucket_name)
+    master_zone.delete_bucket(bucket_name)
 
 
 def test_ps_s3_creation_triggers_on_master():
@@ -2474,13 +2473,13 @@ def test_ps_s3_creation_triggers_on_master():
     proc = init_rabbitmq()
     if proc is  None:
         return SkipTest('end2end amqp tests require rabbitmq-server installed')
-    zones, _  = init_env(require_ps=False)
+    master_zone, _ = init_env(require_ps=False)
     realm = get_realm()
     zonegroup = realm.master_zonegroup()
     
     # create bucket
     bucket_name = gen_bucket_name()
-    bucket = zones[0].create_bucket(bucket_name)
+    bucket = master_zone.create_bucket(bucket_name)
     topic_name = bucket_name + TOPIC_SUFFIX
 
     # start amqp receiver
@@ -2491,7 +2490,7 @@ def test_ps_s3_creation_triggers_on_master():
     # create s3 topic
     endpoint_address = 'amqp://' + hostname
     endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker'
-    topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
+    topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
     topic_arn = topic_conf.set_config()
     # create s3 notification
     notification_name = bucket_name + NOTIFICATION_SUFFIX
@@ -2499,7 +2498,7 @@ def test_ps_s3_creation_triggers_on_master():
                         'Events': ['s3:ObjectCreated:Put', 's3:ObjectCreated:Copy']
                        }]
 
-    s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
+    s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
     response, status = s3_notification_conf.set_config()
     assert_equal(status/100, 2)
 
@@ -2532,7 +2531,7 @@ def test_ps_s3_creation_triggers_on_master():
     for key in bucket.list():
         key.delete()
     # delete the bucket
-    zones[0].delete_bucket(bucket_name)
+    master_zone.delete_bucket(bucket_name)
     clean_rabbitmq(proc)
 
 
@@ -2544,13 +2543,13 @@ def test_ps_s3_multipart_on_master():
     proc = init_rabbitmq()
     if proc is  None:
         return SkipTest('end2end amqp tests require rabbitmq-server installed')
-    zones, _  = init_env(require_ps=False)
+    master_zone, _ = init_env(require_ps=False)
     realm = get_realm()
     zonegroup = realm.master_zonegroup()
     
     # create bucket
     bucket_name = gen_bucket_name()
-    bucket = zones[0].create_bucket(bucket_name)
+    bucket = master_zone.create_bucket(bucket_name)
     topic_name = bucket_name + TOPIC_SUFFIX
 
     # start amqp receivers
@@ -2565,11 +2564,11 @@ def test_ps_s3_multipart_on_master():
     # create s3 topics
     endpoint_address = 'amqp://' + hostname
     endpoint_args = 'push-endpoint=' + endpoint_address + '&amqp-exchange=' + exchange + '&amqp-ack-level=broker'
-    topic_conf1 = PSTopicS3(zones[0].conn, topic_name+'_1', zonegroup.name, endpoint_args=endpoint_args)
+    topic_conf1 = PSTopicS3(master_zone.conn, topic_name+'_1', zonegroup.name, endpoint_args=endpoint_args)
     topic_arn1 = topic_conf1.set_config()
-    topic_conf2 = PSTopicS3(zones[0].conn, topic_name+'_2', zonegroup.name, endpoint_args=endpoint_args)
+    topic_conf2 = PSTopicS3(master_zone.conn, topic_name+'_2', zonegroup.name, endpoint_args=endpoint_args)
     topic_arn2 = topic_conf2.set_config()
-    topic_conf3 = PSTopicS3(zones[0].conn, topic_name+'_3', zonegroup.name, endpoint_args=endpoint_args)
+    topic_conf3 = PSTopicS3(master_zone.conn, topic_name+'_3', zonegroup.name, endpoint_args=endpoint_args)
     topic_arn3 = topic_conf3.set_config()
 
     # create s3 notifications
@@ -2583,7 +2582,7 @@ def test_ps_s3_multipart_on_master():
                        {'Id': notification_name+'_3', 'TopicArn': topic_arn3,
                         'Events': ['s3:ObjectCreated:CompleteMultipartUpload']
                        }]
-    s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
+    s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
     response, status = s3_notification_conf.set_config()
     assert_equal(status/100, 2)
 
@@ -2626,51 +2625,51 @@ def test_ps_s3_multipart_on_master():
     for key in bucket.list():
         key.delete()
     # delete the bucket
-    zones[0].delete_bucket(bucket_name)
+    master_zone.delete_bucket(bucket_name)
     clean_rabbitmq(proc)
 
 
 def test_ps_versioned_deletion():
     """ test notification of deletion markers """
-    zones, ps_zones = init_env()
+    master_zone, ps_zone = init_env()
     bucket_name = gen_bucket_name()
     topic_name = bucket_name+TOPIC_SUFFIX
 
     # create topics
-    topic_conf1 = PSTopic(ps_zones[0].conn, topic_name+'_1')
+    topic_conf1 = PSTopic(ps_zone.conn, topic_name+'_1')
     _, status = topic_conf1.set_config()
     assert_equal(status/100, 2)
-    topic_conf2 = PSTopic(ps_zones[0].conn, topic_name+'_2')
+    topic_conf2 = PSTopic(ps_zone.conn, topic_name+'_2')
     _, status = topic_conf2.set_config()
     assert_equal(status/100, 2)
     
     # create bucket on the first of the rados zones
-    bucket = zones[0].create_bucket(bucket_name)
+    bucket = master_zone.create_bucket(bucket_name)
     bucket.configure_versioning(True)
     
     # wait for sync
-    zone_meta_checkpoint(ps_zones[0].zone)
+    zone_meta_checkpoint(ps_zone.zone)
     
     # create notifications
     event_type1 = 'OBJECT_DELETE'
-    notification_conf1 = PSNotification(ps_zones[0].conn, bucket_name,
+    notification_conf1 = PSNotification(ps_zone.conn, bucket_name,
                                         topic_name+'_1',
                                         event_type1)
     _, status = notification_conf1.set_config()
     assert_equal(status/100, 2)
     event_type2 = 'DELETE_MARKER_CREATE'
-    notification_conf2 = PSNotification(ps_zones[0].conn, bucket_name,
+    notification_conf2 = PSNotification(ps_zone.conn, bucket_name,
                                         topic_name+'_2',
                                         event_type2)
     _, status = notification_conf2.set_config()
     assert_equal(status/100, 2)
     
     # create subscriptions
-    sub_conf1 = PSSubscription(ps_zones[0].conn, bucket_name+SUB_SUFFIX+'_1',
+    sub_conf1 = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX+'_1',
                                topic_name+'_1')
     _, status = sub_conf1.set_config()
     assert_equal(status/100, 2)
-    sub_conf2 = PSSubscription(ps_zones[0].conn, bucket_name+SUB_SUFFIX+'_2',
+    sub_conf2 = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX+'_2',
                                topic_name+'_2')
     _, status = sub_conf2.set_config()
     assert_equal(status/100, 2)
@@ -2685,7 +2684,7 @@ def test_ps_versioned_deletion():
     delete_marker_key = bucket.delete_key(key.name)
     
     # wait for sync
-    zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
+    zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
 
     # delete the deletion marker
     delete_marker_key.delete()
@@ -2694,7 +2693,7 @@ def test_ps_versioned_deletion():
     bucket.delete_key(key.name, version_id=v1)
     
     # wait for sync
-    zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
+    zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
 
     # get the delete events from the subscription
     result, _ = sub_conf1.get_events()
@@ -2717,7 +2716,7 @@ def test_ps_versioned_deletion():
     zonegroup_conns = ZonegroupConns(zonegroup)
     try:
         zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name)
-        zones[0].delete_bucket(bucket_name)
+        master_zone.delete_bucket(bucket_name)
     except:
         log.debug('zonegroup_bucket_checkpoint failed, cannot delete bucket')
     sub_conf1.del_config()
@@ -2736,13 +2735,13 @@ def test_ps_s3_metadata_on_master():
     proc = init_rabbitmq()
     if proc is  None:
         return SkipTest('end2end amqp tests require rabbitmq-server installed')
-    zones, _  = init_env(require_ps=False)
+    master_zone, _ = init_env(require_ps=False)
     realm = get_realm()
     zonegroup = realm.master_zonegroup()
     
     # create bucket
     bucket_name = gen_bucket_name()
-    bucket = zones[0].create_bucket(bucket_name)
+    bucket = master_zone.create_bucket(bucket_name)
     topic_name = bucket_name + TOPIC_SUFFIX
 
     # start amqp receiver
@@ -2753,7 +2752,7 @@ def test_ps_s3_metadata_on_master():
     # create s3 topic
     endpoint_address = 'amqp://' + hostname
     endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker'
-    topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
+    topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
     topic_arn = topic_conf.set_config()
     # create s3 notification
     notification_name = bucket_name + NOTIFICATION_SUFFIX
@@ -2769,7 +2768,7 @@ def test_ps_s3_metadata_on_master():
         }
     }]
 
-    s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
+    s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
     response, status = s3_notification_conf.set_config()
     assert_equal(status/100, 2)
 
@@ -2825,7 +2824,7 @@ def test_ps_s3_metadata_on_master():
     s3_notification_conf.del_config()
     topic_conf.del_config()
     # delete the bucket
-    zones[0].delete_bucket(bucket_name)
+    master_zone.delete_bucket(bucket_name)
     clean_rabbitmq(proc)
 
 
@@ -2837,13 +2836,13 @@ def test_ps_s3_tags_on_master():
     proc = init_rabbitmq()
     if proc is  None:
         return SkipTest('end2end amqp tests require rabbitmq-server installed')
-    zones, _  = init_env(require_ps=False)
+    master_zone, _ = init_env(require_ps=False)
     realm = get_realm()
     zonegroup = realm.master_zonegroup()
     
     # create bucket
     bucket_name = gen_bucket_name()
-    bucket = zones[0].create_bucket(bucket_name)
+    bucket = master_zone.create_bucket(bucket_name)
     topic_name = bucket_name + TOPIC_SUFFIX
 
     # start amqp receiver
@@ -2854,7 +2853,7 @@ def test_ps_s3_tags_on_master():
     # create s3 topic
     endpoint_address = 'amqp://' + hostname
     endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker'
-    topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
+    topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
     topic_arn = topic_conf.set_config()
     # create s3 notification
     notification_name = bucket_name + NOTIFICATION_SUFFIX
@@ -2867,17 +2866,17 @@ def test_ps_s3_tags_on_master():
         }
     }]
 
-    s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
+    s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
     response, status = s3_notification_conf.set_config()
     assert_equal(status/100, 2)
 
     # create objects in the bucket with tags
     tags = 'hello=world&ka=boom'
     key_name1 = 'key1'
-    put_object_tagging(zones[0].conn, bucket_name, key_name1, tags)
+    put_object_tagging(master_zone.conn, bucket_name, key_name1, tags)
     tags = 'foo=bar&ka=boom'
     key_name2 = 'key2'
-    put_object_tagging(zones[0].conn, bucket_name, key_name2, tags)
+    put_object_tagging(master_zone.conn, bucket_name, key_name2, tags)
     key_name3 = 'key3'
     key = bucket.new_key(key_name3)
     key.set_contents_from_string('bar')
@@ -2888,7 +2887,7 @@ def test_ps_s3_tags_on_master():
     expected_tags = [{'val': 'world', 'key': 'hello'}, {'val': 'boom', 'key': 'ka'}]
     # check amqp receiver
     for event in receiver.get_and_reset_events():
-        obj_tags =  event['s3']['object']['tags']
+        obj_tags =  event['Records'][0]['s3']['object']['tags']
         assert_equal(obj_tags[0], expected_tags[0])
 
     # delete the objects
@@ -2898,7 +2897,7 @@ def test_ps_s3_tags_on_master():
     time.sleep(5)
     # check amqp receiver
     for event in receiver.get_and_reset_events():
-        obj_tags =  event['s3']['object']['tags']
+        obj_tags =  event['Records'][0]['s3']['object']['tags']
         assert_equal(obj_tags[0], expected_tags[0])
 
     # cleanup
@@ -2906,7 +2905,7 @@ def test_ps_s3_tags_on_master():
     s3_notification_conf.del_config()
     topic_conf.del_config()
     # delete the bucket
-    zones[0].delete_bucket(bucket_name)
+    master_zone.delete_bucket(bucket_name)
     clean_rabbitmq(proc)
 
 
@@ -2918,13 +2917,13 @@ def test_ps_s3_versioned_deletion_on_master():
     proc = init_rabbitmq()
     if proc is  None:
         return SkipTest('end2end amqp tests require rabbitmq-server installed')
-    zones, _  = init_env(require_ps=False)
+    master_zone, _ = init_env(require_ps=False)
     realm = get_realm()
     zonegroup = realm.master_zonegroup()
     
     # create bucket
     bucket_name = gen_bucket_name()
-    bucket = zones[0].create_bucket(bucket_name)
+    bucket = master_zone.create_bucket(bucket_name)
     bucket.configure_versioning(True)
     topic_name = bucket_name + TOPIC_SUFFIX
 
@@ -2936,7 +2935,7 @@ def test_ps_s3_versioned_deletion_on_master():
     # create s3 topic
     endpoint_address = 'amqp://' + hostname
     endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker'
-    topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
+    topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
     topic_arn = topic_conf.set_config()
     # create s3 notification
     notification_name = bucket_name + NOTIFICATION_SUFFIX
@@ -2950,7 +2949,7 @@ def test_ps_s3_versioned_deletion_on_master():
                        {'Id': notification_name+'_3', 'TopicArn': topic_arn,
                          'Events': ['s3:ObjectRemoved:Delete']
                        }]
-    s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
+    s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
     response, status = s3_notification_conf.set_config()
     assert_equal(status/100, 2)
 
@@ -2998,7 +2997,7 @@ def test_ps_s3_versioned_deletion_on_master():
     s3_notification_conf.del_config()
     topic_conf.del_config()
     # delete the bucket
-    zones[0].delete_bucket(bucket_name)
+    master_zone.delete_bucket(bucket_name)
     clean_rabbitmq(proc)
 
 
@@ -3006,7 +3005,7 @@ def test_ps_push_http():
     """ test pushing to http endpoint """
     if skip_push_tests:
         return SkipTest("PubSub push tests don't run in teuthology")
-    zones, ps_zones = init_env()
+    master_zone, ps_zone = init_env()
     bucket_name = gen_bucket_name()
     topic_name = bucket_name+TOPIC_SUFFIX
 
@@ -3017,20 +3016,20 @@ def test_ps_push_http():
     http_server = StreamingHTTPServer(host, port)
 
     # create topic
-    topic_conf = PSTopic(ps_zones[0].conn, topic_name)
+    topic_conf = PSTopic(ps_zone.conn, topic_name)
     _, status = topic_conf.set_config()
     assert_equal(status/100, 2)
     # create bucket on the first of the rados zones
-    bucket = zones[0].create_bucket(bucket_name)
+    bucket = master_zone.create_bucket(bucket_name)
     # wait for sync
-    zone_meta_checkpoint(ps_zones[0].zone)
+    zone_meta_checkpoint(ps_zone.zone)
     # create notifications
-    notification_conf = PSNotification(ps_zones[0].conn, bucket_name,
+    notification_conf = PSNotification(ps_zone.conn, bucket_name,
                                        topic_name)
     _, status = notification_conf.set_config()
     assert_equal(status/100, 2)
     # create subscription
-    sub_conf = PSSubscription(ps_zones[0].conn, bucket_name+SUB_SUFFIX,
+    sub_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX,
                               topic_name, endpoint='http://'+host+':'+str(port))
     _, status = sub_conf.set_config()
     assert_equal(status/100, 2)
@@ -3040,7 +3039,7 @@ def test_ps_push_http():
         key = bucket.new_key(str(i))
         key.set_contents_from_string('bar')
     # wait for sync
-    zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
+    zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
     # check http server
     keys = list(bucket.list())
     # TODO: use exact match
@@ -3050,8 +3049,8 @@ def test_ps_push_http():
     for key in bucket.list():
         key.delete()
     # wait for sync
-    zone_meta_checkpoint(ps_zones[0].zone)
-    zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
+    zone_meta_checkpoint(ps_zone.zone)
+    zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
     # check http server
     # TODO: use exact match
     http_server.verify_events(keys, deletions=True, exact_match=False)
@@ -3060,7 +3059,7 @@ def test_ps_push_http():
     sub_conf.del_config()
     notification_conf.del_config()
     topic_conf.del_config()
-    zones[0].delete_bucket(bucket_name)
+    master_zone.delete_bucket(bucket_name)
     http_server.close()
 
 
@@ -3068,7 +3067,7 @@ def test_ps_s3_push_http():
     """ test pushing to http endpoint s3 record format"""
     if skip_push_tests:
         return SkipTest("PubSub push tests don't run in teuthology")
-    zones, ps_zones = init_env()
+    master_zone, ps_zone = init_env()
     bucket_name = gen_bucket_name()
     topic_name = bucket_name+TOPIC_SUFFIX
 
@@ -3079,23 +3078,23 @@ def test_ps_s3_push_http():
     http_server = StreamingHTTPServer(host, port)
 
     # create topic
-    topic_conf = PSTopic(ps_zones[0].conn, topic_name,
+    topic_conf = PSTopic(ps_zone.conn, topic_name,
                          endpoint='http://'+host+':'+str(port))
     result, status = topic_conf.set_config()
     assert_equal(status/100, 2)
     parsed_result = json.loads(result)
     topic_arn = parsed_result['arn']
     # create bucket on the first of the rados zones
-    bucket = zones[0].create_bucket(bucket_name)
+    bucket = master_zone.create_bucket(bucket_name)
     # wait for sync
-    zone_meta_checkpoint(ps_zones[0].zone)
+    zone_meta_checkpoint(ps_zone.zone)
     # create s3 notification
     notification_name = bucket_name + NOTIFICATION_SUFFIX
     topic_conf_list = [{'Id': notification_name,
                         'TopicArn': topic_arn,
                         'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*']
                        }]
-    s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
+    s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
     _, status = s3_notification_conf.set_config()
     assert_equal(status/100, 2)
     # create objects in the bucket
@@ -3104,7 +3103,7 @@ def test_ps_s3_push_http():
         key = bucket.new_key(str(i))
         key.set_contents_from_string('bar')
     # wait for sync
-    zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
+    zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
     # check http server
     keys = list(bucket.list())
     # TODO: use exact match
@@ -3114,8 +3113,8 @@ def test_ps_s3_push_http():
     for key in bucket.list():
         key.delete()
     # wait for sync
-    zone_meta_checkpoint(ps_zones[0].zone)
-    zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
+    zone_meta_checkpoint(ps_zone.zone)
+    zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
     # check http server
     # TODO: use exact match
     http_server.verify_s3_events(keys, deletions=True, exact_match=False)
@@ -3123,7 +3122,7 @@ def test_ps_s3_push_http():
     # cleanup
     s3_notification_conf.del_config()
     topic_conf.del_config()
-    zones[0].delete_bucket(bucket_name)
+    master_zone.delete_bucket(bucket_name)
     http_server.close()
 
 
@@ -3135,7 +3134,7 @@ def test_ps_push_amqp():
     proc = init_rabbitmq()
     if proc is  None:
         return SkipTest('end2end amqp tests require rabbitmq-server installed')
-    zones, ps_zones = init_env()
+    master_zone, ps_zone = init_env()
     bucket_name = gen_bucket_name()
     topic_name = bucket_name+TOPIC_SUFFIX
 
@@ -3143,20 +3142,20 @@ def test_ps_push_amqp():
     exchange = 'ex1'
     task, receiver = create_amqp_receiver_thread(exchange, topic_name)
     task.start()
-    topic_conf = PSTopic(ps_zones[0].conn, topic_name)
+    topic_conf = PSTopic(ps_zone.conn, topic_name)
     _, status = topic_conf.set_config()
     assert_equal(status/100, 2)
     # create bucket on the first of the rados zones
-    bucket = zones[0].create_bucket(bucket_name)
+    bucket = master_zone.create_bucket(bucket_name)
     # wait for sync
-    zone_meta_checkpoint(ps_zones[0].zone)
+    zone_meta_checkpoint(ps_zone.zone)
     # create notifications
-    notification_conf = PSNotification(ps_zones[0].conn, bucket_name,
+    notification_conf = PSNotification(ps_zone.conn, bucket_name,
                                        topic_name)
     _, status = notification_conf.set_config()
     assert_equal(status/100, 2)
     # create subscription
-    sub_conf = PSSubscription(ps_zones[0].conn, bucket_name+SUB_SUFFIX,
+    sub_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX,
                               topic_name, endpoint='amqp://'+hostname,
                               endpoint_args='amqp-exchange='+exchange+'&amqp-ack-level=broker')
     _, status = sub_conf.set_config()
@@ -3167,7 +3166,7 @@ def test_ps_push_amqp():
         key = bucket.new_key(str(i))
         key.set_contents_from_string('bar')
     # wait for sync
-    zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
+    zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
     # check amqp receiver
     keys = list(bucket.list())
     # TODO: use exact match
@@ -3177,8 +3176,8 @@ def test_ps_push_amqp():
     for key in bucket.list():
         key.delete()
     # wait for sync
-    zone_meta_checkpoint(ps_zones[0].zone)
-    zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
+    zone_meta_checkpoint(ps_zone.zone)
+    zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
     # check amqp receiver
     # TODO: use exact match
     receiver.verify_events(keys, deletions=True, exact_match=False)
@@ -3188,7 +3187,7 @@ def test_ps_push_amqp():
     sub_conf.del_config()
     notification_conf.del_config()
     topic_conf.del_config()
-    zones[0].delete_bucket(bucket_name)
+    master_zone.delete_bucket(bucket_name)
     clean_rabbitmq(proc)
 
 
@@ -3200,7 +3199,7 @@ def test_ps_s3_push_amqp():
     proc = init_rabbitmq()
     if proc is  None:
         return SkipTest('end2end amqp tests require rabbitmq-server installed')
-    zones, ps_zones = init_env()
+    master_zone, ps_zone = init_env()
     bucket_name = gen_bucket_name()
     topic_name = bucket_name+TOPIC_SUFFIX
 
@@ -3208,7 +3207,7 @@ def test_ps_s3_push_amqp():
     exchange = 'ex1'
     task, receiver = create_amqp_receiver_thread(exchange, topic_name)
     task.start()
-    topic_conf = PSTopic(ps_zones[0].conn, topic_name,
+    topic_conf = PSTopic(ps_zone.conn, topic_name,
                          endpoint='amqp://' + hostname,
                          endpoint_args='amqp-exchange=' + exchange + '&amqp-ack-level=none')
     result, status = topic_conf.set_config()
@@ -3216,16 +3215,16 @@ def test_ps_s3_push_amqp():
     parsed_result = json.loads(result)
     topic_arn = parsed_result['arn']
     # create bucket on the first of the rados zones
-    bucket = zones[0].create_bucket(bucket_name)
+    bucket = master_zone.create_bucket(bucket_name)
     # wait for sync
-    zone_meta_checkpoint(ps_zones[0].zone)
+    zone_meta_checkpoint(ps_zone.zone)
     # create s3 notification
     notification_name = bucket_name + NOTIFICATION_SUFFIX
     topic_conf_list = [{'Id': notification_name,
                         'TopicArn': topic_arn,
                         'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*']
                        }]
-    s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
+    s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
     _, status = s3_notification_conf.set_config()
     assert_equal(status/100, 2)
     # create objects in the bucket
@@ -3234,7 +3233,7 @@ def test_ps_s3_push_amqp():
         key = bucket.new_key(str(i))
         key.set_contents_from_string('bar')
     # wait for sync
-    zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
+    zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
     # check amqp receiver
     keys = list(bucket.list())
     # TODO: use exact match
@@ -3244,8 +3243,8 @@ def test_ps_s3_push_amqp():
     for key in bucket.list():
         key.delete()
     # wait for sync
-    zone_meta_checkpoint(ps_zones[0].zone)
-    zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
+    zone_meta_checkpoint(ps_zone.zone)
+    zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
     # check amqp receiver
     # TODO: use exact match
     receiver.verify_s3_events(keys, deletions=True, exact_match=False)
@@ -3254,22 +3253,22 @@ def test_ps_s3_push_amqp():
     stop_amqp_receiver(receiver, task)
     s3_notification_conf.del_config()
     topic_conf.del_config()
-    zones[0].delete_bucket(bucket_name)
+    master_zone.delete_bucket(bucket_name)
     clean_rabbitmq(proc)
 
 
 def test_ps_delete_bucket():
     """ test notification status upon bucket deletion """
-    zones, ps_zones = init_env()
+    master_zone, ps_zone = init_env()
     bucket_name = gen_bucket_name()
     # create bucket on the first of the rados zones
-    bucket = zones[0].create_bucket(bucket_name)
+    bucket = master_zone.create_bucket(bucket_name)
     # wait for sync
-    zone_meta_checkpoint(ps_zones[0].zone)
+    zone_meta_checkpoint(ps_zone.zone)
     topic_name = bucket_name + TOPIC_SUFFIX
     # create topic
     topic_name = bucket_name + TOPIC_SUFFIX
-    topic_conf = PSTopic(ps_zones[0].conn, topic_name)
+    topic_conf = PSTopic(ps_zone.conn, topic_name)
     response, status = topic_conf.set_config()
     assert_equal(status/100, 2)
     parsed_result = json.loads(response)
@@ -3280,12 +3279,12 @@ def test_ps_delete_bucket():
                         'TopicArn': topic_arn,
                         'Events': ['s3:ObjectCreated:*']
                        }]
-    s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
+    s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
     response, status = s3_notification_conf.set_config()
     assert_equal(status/100, 2)
 
     # create non-s3 notification
-    notification_conf = PSNotification(ps_zones[0].conn, bucket_name,
+    notification_conf = PSNotification(ps_zone.conn, bucket_name,
                                        topic_name)
     _, status = notification_conf.set_config()
     assert_equal(status/100, 2)
@@ -3296,20 +3295,20 @@ def test_ps_delete_bucket():
         key = bucket.new_key(str(i))
         key.set_contents_from_string('bar')
     # wait for bucket sync
-    zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
+    zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
     keys = list(bucket.list())
     # delete objects from the bucket
     for key in bucket.list():
         key.delete()
     # wait for bucket sync
-    zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
+    zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
     # delete the bucket
-    zones[0].delete_bucket(bucket_name)
+    master_zone.delete_bucket(bucket_name)
     # wait for meta sync
-    zone_meta_checkpoint(ps_zones[0].zone)
+    zone_meta_checkpoint(ps_zone.zone)
 
     # get the events from the auto-generated subscription
-    sub_conf = PSSubscription(ps_zones[0].conn, notification_name,
+    sub_conf = PSSubscription(ps_zone.conn, notification_name,
                               topic_name)
     result, _ = sub_conf.get_events()
     records = json.loads(result)
@@ -3329,14 +3328,14 @@ def test_ps_delete_bucket():
 
 def test_ps_missing_topic():
     """ test creating a subscription when no topic info exists"""
-    zones, ps_zones = init_env()
+    master_zone, ps_zone = init_env()
     bucket_name = gen_bucket_name()
     topic_name = bucket_name+TOPIC_SUFFIX
 
     # create bucket on the first of the rados zones
-    zones[0].create_bucket(bucket_name)
+    master_zone.create_bucket(bucket_name)
     # wait for sync
-    zone_meta_checkpoint(ps_zones[0].zone)
+    zone_meta_checkpoint(ps_zone.zone)
     # create s3 notification
     notification_name = bucket_name + NOTIFICATION_SUFFIX
     topic_arn = 'arn:aws:sns:::' + topic_name
@@ -3344,7 +3343,7 @@ def test_ps_missing_topic():
                         'TopicArn': topic_arn,
                         'Events': ['s3:ObjectCreated:*']
                        }]
-    s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
+    s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
     try:
         s3_notification_conf.set_config()
     except:
@@ -3353,7 +3352,7 @@ def test_ps_missing_topic():
         assert 'missing topic is expected'
 
     # cleanup
-    zones[0].delete_bucket(bucket_name)
+    master_zone.delete_bucket(bucket_name)
 
 
 def test_ps_s3_topic_update():
@@ -3363,7 +3362,7 @@ def test_ps_s3_topic_update():
     rabbit_proc = init_rabbitmq()
     if rabbit_proc is  None:
         return SkipTest('end2end amqp tests require rabbitmq-server installed')
-    zones, ps_zones = init_env()
+    master_zone, ps_zone = init_env()
     bucket_name = gen_bucket_name()
     topic_name = bucket_name+TOPIC_SUFFIX
 
@@ -3372,7 +3371,7 @@ def test_ps_s3_topic_update():
     exchange = 'ex1'
     amqp_task, receiver = create_amqp_receiver_thread(exchange, topic_name)
     amqp_task.start()
-    topic_conf = PSTopic(ps_zones[0].conn, topic_name,
+    topic_conf = PSTopic(ps_zone.conn, topic_name,
                           endpoint='amqp://' + hostname,
                           endpoint_args='amqp-exchange=' + exchange + '&amqp-ack-level=none')
     result, status = topic_conf.set_config()
@@ -3392,16 +3391,16 @@ def test_ps_s3_topic_update():
     http_server = StreamingHTTPServer(hostname, port)
 
     # create bucket on the first of the rados zones
-    bucket = zones[0].create_bucket(bucket_name)
+    bucket = master_zone.create_bucket(bucket_name)
     # wait for sync
-    zone_meta_checkpoint(ps_zones[0].zone)
+    zone_meta_checkpoint(ps_zone.zone)
     # create s3 notification
     notification_name = bucket_name + NOTIFICATION_SUFFIX
     topic_conf_list = [{'Id': notification_name,
                         'TopicArn': topic_arn,
                         'Events': ['s3:ObjectCreated:*']
                        }]
-    s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
+    s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
     _, status = s3_notification_conf.set_config()
     assert_equal(status/100, 2)
     # create objects in the bucket
@@ -3410,14 +3409,14 @@ def test_ps_s3_topic_update():
         key = bucket.new_key(str(i))
         key.set_contents_from_string('bar')
     # wait for sync
-    zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
+    zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
 
     keys = list(bucket.list())
     # TODO: use exact match
     receiver.verify_s3_events(keys, exact_match=False)
 
     # update the same topic with new endpoint
-    topic_conf = PSTopic(ps_zones[0].conn, topic_name,
+    topic_conf = PSTopic(ps_zone.conn, topic_name,
                          endpoint='http://'+ hostname + ':' + str(port))
     _, status = topic_conf.set_config()
     assert_equal(status/100, 2)
@@ -3435,8 +3434,8 @@ def test_ps_s3_topic_update():
         key = bucket.new_key(str(i+100))
         key.set_contents_from_string('bar')
     # wait for sync
-    zone_meta_checkpoint(ps_zones[0].zone)
-    zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
+    zone_meta_checkpoint(ps_zone.zone)
+    zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
 
     keys = list(bucket.list())
     # verify that notifications are still sent to amqp
@@ -3448,7 +3447,7 @@ def test_ps_s3_topic_update():
                         'TopicArn': topic_arn,
                         'Events': ['s3:ObjectCreated:*']
                        }]
-    s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
+    s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
     _, status = s3_notification_conf.set_config()
     assert_equal(status/100, 2)
     
@@ -3459,8 +3458,8 @@ def test_ps_s3_topic_update():
         key = bucket.new_key(str(i+200))
         key.set_contents_from_string('bar')
     # wait for sync
-    zone_meta_checkpoint(ps_zones[0].zone)
-    zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
+    zone_meta_checkpoint(ps_zone.zone)
+    zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
 
     keys = list(bucket.list())
     # check that updates switched to http
@@ -3474,7 +3473,7 @@ def test_ps_s3_topic_update():
         key.delete()
     s3_notification_conf.del_config()
     topic_conf.del_config()
-    zones[0].delete_bucket(bucket_name)
+    master_zone.delete_bucket(bucket_name)
     http_server.close()
     clean_rabbitmq(rabbit_proc)
 
@@ -3488,7 +3487,7 @@ def test_ps_s3_notification_update():
     if rabbit_proc is  None:
         return SkipTest('end2end amqp tests require rabbitmq-server installed')
 
-    zones, ps_zones = init_env()
+    master_zone, ps_zone = init_env()
     bucket_name = gen_bucket_name()
     topic_name1 = bucket_name+'amqp'+TOPIC_SUFFIX
     topic_name2 = bucket_name+'http'+TOPIC_SUFFIX
@@ -3503,14 +3502,14 @@ def test_ps_s3_notification_update():
     # start an http server in a separate thread
     http_server = StreamingHTTPServer(hostname, http_port)
 
-    topic_conf1 = PSTopic(ps_zones[0].conn, topic_name1,
+    topic_conf1 = PSTopic(ps_zone.conn, topic_name1,
                           endpoint='amqp://' + hostname,
                           endpoint_args='amqp-exchange=' + exchange + '&amqp-ack-level=none')
     result, status = topic_conf1.set_config()
     parsed_result = json.loads(result)
     topic_arn1 = parsed_result['arn']
     assert_equal(status/100, 2)
-    topic_conf2 = PSTopic(ps_zones[0].conn, topic_name2,
+    topic_conf2 = PSTopic(ps_zone.conn, topic_name2,
                           endpoint='http://'+hostname+':'+str(http_port))
     result, status = topic_conf2.set_config()
     parsed_result = json.loads(result)
@@ -3518,16 +3517,16 @@ def test_ps_s3_notification_update():
     assert_equal(status/100, 2)
 
     # create bucket on the first of the rados zones
-    bucket = zones[0].create_bucket(bucket_name)
+    bucket = master_zone.create_bucket(bucket_name)
     # wait for sync
-    zone_meta_checkpoint(ps_zones[0].zone)
+    zone_meta_checkpoint(ps_zone.zone)
     # create s3 notification with topic1
     notification_name = bucket_name + NOTIFICATION_SUFFIX
     topic_conf_list = [{'Id': notification_name,
                         'TopicArn': topic_arn1,
                         'Events': ['s3:ObjectCreated:*']
                        }]
-    s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
+    s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
     _, status = s3_notification_conf.set_config()
     assert_equal(status/100, 2)
     # create objects in the bucket
@@ -3536,7 +3535,7 @@ def test_ps_s3_notification_update():
         key = bucket.new_key(str(i))
         key.set_contents_from_string('bar')
     # wait for sync
-    zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
+    zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
 
     keys = list(bucket.list())
     # TODO: use exact match
@@ -3547,7 +3546,7 @@ def test_ps_s3_notification_update():
                         'TopicArn': topic_arn2,
                         'Events': ['s3:ObjectCreated:*']
                        }]
-    s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
+    s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
     _, status = s3_notification_conf.set_config()
     assert_equal(status/100, 2)
 
@@ -3558,8 +3557,8 @@ def test_ps_s3_notification_update():
         key = bucket.new_key(str(i+100))
         key.set_contents_from_string('bar')
     # wait for sync
-    zone_meta_checkpoint(ps_zones[0].zone)
-    zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
+    zone_meta_checkpoint(ps_zone.zone)
+    zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
 
     keys = list(bucket.list())
     # check that updates switched to http
@@ -3574,7 +3573,7 @@ def test_ps_s3_notification_update():
     s3_notification_conf.del_config()
     topic_conf1.del_config()
     topic_conf2.del_config()
-    zones[0].delete_bucket(bucket_name)
+    master_zone.delete_bucket(bucket_name)
     http_server.close()
     clean_rabbitmq(rabbit_proc)
 
@@ -3588,7 +3587,7 @@ def test_ps_s3_multiple_topics_notification():
     if rabbit_proc is  None:
         return SkipTest('end2end amqp tests require rabbitmq-server installed')
 
-    zones, ps_zones = init_env()
+    master_zone, ps_zone = init_env()
     bucket_name = gen_bucket_name()
     topic_name1 = bucket_name+'amqp'+TOPIC_SUFFIX
     topic_name2 = bucket_name+'http'+TOPIC_SUFFIX
@@ -3603,14 +3602,14 @@ def test_ps_s3_multiple_topics_notification():
     # start an http server in a separate thread
     http_server = StreamingHTTPServer(hostname, http_port)
 
-    topic_conf1 = PSTopic(ps_zones[0].conn, topic_name1,
+    topic_conf1 = PSTopic(ps_zone.conn, topic_name1,
                           endpoint='amqp://' + hostname,
                           endpoint_args='amqp-exchange=' + exchange + '&amqp-ack-level=none')
     result, status = topic_conf1.set_config()
     parsed_result = json.loads(result)
     topic_arn1 = parsed_result['arn']
     assert_equal(status/100, 2)
-    topic_conf2 = PSTopic(ps_zones[0].conn, topic_name2,
+    topic_conf2 = PSTopic(ps_zone.conn, topic_name2,
                           endpoint='http://'+hostname+':'+str(http_port))
     result, status = topic_conf2.set_config()
     parsed_result = json.loads(result)
@@ -3618,9 +3617,9 @@ def test_ps_s3_multiple_topics_notification():
     assert_equal(status/100, 2)
 
     # create bucket on the first of the rados zones
-    bucket = zones[0].create_bucket(bucket_name)
+    bucket = master_zone.create_bucket(bucket_name)
     # wait for sync
-    zone_meta_checkpoint(ps_zones[0].zone)
+    zone_meta_checkpoint(ps_zone.zone)
     # create s3 notification
     notification_name1 = bucket_name + NOTIFICATION_SUFFIX + '_1'
     notification_name2 = bucket_name + NOTIFICATION_SUFFIX + '_2'
@@ -3635,7 +3634,7 @@ def test_ps_s3_multiple_topics_notification():
             'TopicArn': topic_arn2,
             'Events': ['s3:ObjectCreated:*']
         }]
-    s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
+    s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
     _, status = s3_notification_conf.set_config()
     assert_equal(status/100, 2)
     result, _ = s3_notification_conf.get_config()
@@ -3644,11 +3643,11 @@ def test_ps_s3_multiple_topics_notification():
     assert_equal(result['TopicConfigurations'][1]['Id'], notification_name2)
 
     # get auto-generated subscriptions
-    sub_conf1 = PSSubscription(ps_zones[0].conn, notification_name1,
+    sub_conf1 = PSSubscription(ps_zone.conn, notification_name1,
                                topic_name1)
     _, status = sub_conf1.get_config()
     assert_equal(status/100, 2)
-    sub_conf2 = PSSubscription(ps_zones[0].conn, notification_name2,
+    sub_conf2 = PSSubscription(ps_zone.conn, notification_name2,
                                topic_name2)
     _, status = sub_conf2.get_config()
     assert_equal(status/100, 2)
@@ -3659,7 +3658,7 @@ def test_ps_s3_multiple_topics_notification():
         key = bucket.new_key(str(i))
         key.set_contents_from_string('bar')
     # wait for sync
-    zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
+    zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
 
     # get the events from both of the subscription
     result, _ = sub_conf1.get_events()
@@ -3688,6 +3687,6 @@ def test_ps_s3_multiple_topics_notification():
     # delete objects from the bucket
     for key in bucket.list():
         key.delete()
-    zones[0].delete_bucket(bucket_name)
+    master_zone.delete_bucket(bucket_name)
     http_server.close()
     clean_rabbitmq(rabbit_proc)