zonegroup_meta_checkpoint(zonegroup)
- ps_zones = []
- zones = []
+ ps_zone = None
+ master_zone = None
for conn in zonegroup_conns.zones:
+ if conn.zone == zonegroup.master_zone:
+ master_zone = conn
if is_ps_zone(conn):
zone_meta_checkpoint(conn.zone)
- ps_zones.append(conn)
- elif not conn.zone.is_read_only():
- zones.append(conn)
+ ps_zone = conn
- assert_not_equal(len(zones), 0)
+ assert_not_equal(master_zone, None)
if require_ps:
- assert_not_equal(len(ps_zones), 0)
- return zones, ps_zones
+ assert_not_equal(ps_zone, None)
+ return master_zone, ps_zone
def get_ip():
def test_ps_info():
""" log information for manual testing """
return SkipTest("only used in manual testing")
- zones, ps_zones = init_env()
+ master_zone, ps_zone = init_env()
realm = get_realm()
zonegroup = realm.master_zonegroup()
bucket_name = gen_bucket_name()
# create bucket on the first of the rados zones
- bucket = zones[0].create_bucket(bucket_name)
+ bucket = master_zone.create_bucket(bucket_name)
# create objects in the bucket
number_of_objects = 10
for i in range(number_of_objects):
print('user: ' + get_user())
print('tenant: ' + get_tenant())
print('Master Zone')
- print_connection_info(zones[0].conn)
+ print_connection_info(master_zone.conn)
print('PubSub Zone')
- print_connection_info(ps_zones[0].conn)
+ print_connection_info(ps_zone.conn)
print('Bucket: ' + bucket_name)
def test_ps_s3_notification_low_level():
""" test low level implementation of s3 notifications """
- zones, ps_zones = init_env()
+ master_zone, ps_zone = init_env()
bucket_name = gen_bucket_name()
# create bucket on the first of the rados zones
- zones[0].create_bucket(bucket_name)
+ master_zone.create_bucket(bucket_name)
# wait for sync
- zone_meta_checkpoint(ps_zones[0].zone)
+ zone_meta_checkpoint(ps_zone.zone)
# create topic
topic_name = bucket_name + TOPIC_SUFFIX
- topic_conf = PSTopic(ps_zones[0].conn, topic_name)
+ topic_conf = PSTopic(ps_zone.conn, topic_name)
result, status = topic_conf.set_config()
assert_equal(status/100, 2)
parsed_result = json.loads(result)
'TopicArn': topic_arn,
'Events': ['s3:ObjectCreated:*']
}]
- s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
+ s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
_, status = s3_notification_conf.set_config()
assert_equal(status/100, 2)
- zone_meta_checkpoint(ps_zones[0].zone)
+ zone_meta_checkpoint(ps_zone.zone)
# get auto-generated topic
- generated_topic_conf = PSTopic(ps_zones[0].conn, generated_topic_name)
+ generated_topic_conf = PSTopic(ps_zone.conn, generated_topic_name)
result, status = generated_topic_conf.get_config()
parsed_result = json.loads(result)
assert_equal(status/100, 2)
assert_equal(parsed_result['topic']['name'], generated_topic_name)
# get auto-generated notification
- notification_conf = PSNotification(ps_zones[0].conn, bucket_name,
+ notification_conf = PSNotification(ps_zone.conn, bucket_name,
generated_topic_name)
result, status = notification_conf.get_config()
parsed_result = json.loads(result)
assert_equal(status/100, 2)
assert_equal(len(parsed_result['topics']), 1)
# get auto-generated subscription
- sub_conf = PSSubscription(ps_zones[0].conn, notification_name,
+ sub_conf = PSSubscription(ps_zone.conn, notification_name,
generated_topic_name)
result, status = sub_conf.get_config()
parsed_result = json.loads(result)
# cleanup
topic_conf.del_config()
# delete the bucket
- zones[0].delete_bucket(bucket_name)
+ master_zone.delete_bucket(bucket_name)
def test_ps_s3_notification_records():
""" test s3 records fetching """
- zones, ps_zones = init_env()
+ master_zone, ps_zone = init_env()
bucket_name = gen_bucket_name()
# create bucket on the first of the rados zones
- bucket = zones[0].create_bucket(bucket_name)
+ bucket = master_zone.create_bucket(bucket_name)
# wait for sync
- zone_meta_checkpoint(ps_zones[0].zone)
+ zone_meta_checkpoint(ps_zone.zone)
# create topic
topic_name = bucket_name + TOPIC_SUFFIX
- topic_conf = PSTopic(ps_zones[0].conn, topic_name)
+ topic_conf = PSTopic(ps_zone.conn, topic_name)
result, status = topic_conf.set_config()
assert_equal(status/100, 2)
parsed_result = json.loads(result)
'TopicArn': topic_arn,
'Events': ['s3:ObjectCreated:*']
}]
- s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
+ s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
_, status = s3_notification_conf.set_config()
assert_equal(status/100, 2)
- zone_meta_checkpoint(ps_zones[0].zone)
+ zone_meta_checkpoint(ps_zone.zone)
# get auto-generated subscription
- sub_conf = PSSubscription(ps_zones[0].conn, notification_name,
+ sub_conf = PSSubscription(ps_zone.conn, notification_name,
topic_name)
_, status = sub_conf.get_config()
assert_equal(status/100, 2)
key = bucket.new_key(str(i))
key.set_contents_from_string('bar')
# wait for sync
- zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
+ zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
# get the events from the subscription
result, _ = sub_conf.get_events()
# delete the keys
for key in bucket.list():
key.delete()
- zones[0].delete_bucket(bucket_name)
+ master_zone.delete_bucket(bucket_name)
def test_ps_s3_notification():
""" test s3 notification set/get/delete """
- zones, ps_zones = init_env()
+ master_zone, ps_zone = init_env()
bucket_name = gen_bucket_name()
# create bucket on the first of the rados zones
- zones[0].create_bucket(bucket_name)
+ master_zone.create_bucket(bucket_name)
# wait for sync
- zone_meta_checkpoint(ps_zones[0].zone)
+ zone_meta_checkpoint(ps_zone.zone)
topic_name = bucket_name + TOPIC_SUFFIX
# create topic
topic_name = bucket_name + TOPIC_SUFFIX
- topic_conf = PSTopic(ps_zones[0].conn, topic_name)
+ topic_conf = PSTopic(ps_zone.conn, topic_name)
response, status = topic_conf.set_config()
assert_equal(status/100, 2)
parsed_result = json.loads(response)
'TopicArn': topic_arn,
'Events': ['s3:ObjectCreated:*']
}]
- s3_notification_conf1 = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
+ s3_notification_conf1 = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
response, status = s3_notification_conf1.set_config()
assert_equal(status/100, 2)
# create another s3 notification with the same topic
'TopicArn': topic_arn,
'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*']
}]
- s3_notification_conf2 = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
+ s3_notification_conf2 = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
response, status = s3_notification_conf2.set_config()
assert_equal(status/100, 2)
- zone_meta_checkpoint(ps_zones[0].zone)
+ zone_meta_checkpoint(ps_zone.zone)
# get all notification on a bucket
response, status = s3_notification_conf1.get_config()
# cleanup
topic_conf.del_config()
# delete the bucket
- zones[0].delete_bucket(bucket_name)
+ master_zone.delete_bucket(bucket_name)
def test_ps_s3_topic_on_master():
""" test s3 topics set/get/delete on master """
- zones, _ = init_env(require_ps=False)
+ master_zone, _ = init_env(require_ps=False)
realm = get_realm()
zonegroup = realm.master_zonegroup()
bucket_name = gen_bucket_name()
topic_name = bucket_name + TOPIC_SUFFIX
# clean all topics
- delete_all_s3_topics(zones[0], zonegroup.name)
+ delete_all_s3_topics(master_zone, zonegroup.name)
# create s3 topics
endpoint_address = 'amqp://127.0.0.1:7001'
endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
- topic_conf1 = PSTopicS3(zones[0].conn, topic_name+'_1', zonegroup.name, endpoint_args=endpoint_args)
+ topic_conf1 = PSTopicS3(master_zone.conn, topic_name+'_1', zonegroup.name, endpoint_args=endpoint_args)
topic_arn = topic_conf1.set_config()
assert_equal(topic_arn,
'arn:aws:sns:' + zonegroup.name + ':' + get_tenant() + ':' + topic_name + '_1')
endpoint_address = 'http://127.0.0.1:9001'
endpoint_args = 'push-endpoint='+endpoint_address
- topic_conf2 = PSTopicS3(zones[0].conn, topic_name+'_2', zonegroup.name, endpoint_args=endpoint_args)
+ topic_conf2 = PSTopicS3(master_zone.conn, topic_name+'_2', zonegroup.name, endpoint_args=endpoint_args)
topic_arn = topic_conf2.set_config()
assert_equal(topic_arn,
'arn:aws:sns:' + zonegroup.name + ':' + get_tenant() + ':' + topic_name + '_2')
endpoint_address = 'http://127.0.0.1:9002'
endpoint_args = 'push-endpoint='+endpoint_address
- topic_conf3 = PSTopicS3(zones[0].conn, topic_name+'_3', zonegroup.name, endpoint_args=endpoint_args)
+ topic_conf3 = PSTopicS3(master_zone.conn, topic_name+'_3', zonegroup.name, endpoint_args=endpoint_args)
topic_arn = topic_conf3.set_config()
assert_equal(topic_arn,
'arn:aws:sns:' + zonegroup.name + ':' + get_tenant() + ':' + topic_name + '_3')
def test_ps_s3_topic_with_secret_on_master():
""" test s3 topics with secret set/get/delete on master """
- zones, _ = init_env(require_ps=False)
- if zones[0].secure_conn is None:
+ master_zone, _ = init_env(require_ps=False)
+ if master_zone.secure_conn is None:
return SkipTest('secure connection is needed to test topic with secrets')
realm = get_realm()
topic_name = bucket_name + TOPIC_SUFFIX
# clean all topics
- delete_all_s3_topics(zones[0], zonegroup.name)
+ delete_all_s3_topics(master_zone, zonegroup.name)
# create s3 topics
endpoint_address = 'amqp://user:password@127.0.0.1:7001'
endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
- bad_topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
+ bad_topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
try:
result = bad_topic_conf.set_config()
except Exception as err:
else:
assert False, 'user password configuration set allowed only over HTTPS'
- topic_conf = PSTopicS3(zones[0].secure_conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
+ topic_conf = PSTopicS3(master_zone.secure_conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
topic_arn = topic_conf.set_config()
assert_equal(topic_arn,
def test_ps_s3_notification_on_master():
""" test s3 notification set/get/delete on master """
- zones, _ = init_env(require_ps=False)
+ master_zone, _ = init_env(require_ps=False)
realm = get_realm()
zonegroup = realm.master_zonegroup()
bucket_name = gen_bucket_name()
# create bucket
- bucket = zones[0].create_bucket(bucket_name)
+ bucket = master_zone.create_bucket(bucket_name)
topic_name = bucket_name + TOPIC_SUFFIX
# create s3 topic
endpoint_address = 'amqp://127.0.0.1:7001'
endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
- topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
+ topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
topic_arn = topic_conf.set_config()
# create s3 notification
notification_name = bucket_name + NOTIFICATION_SUFFIX
'TopicArn': topic_arn,
'Events': []
}]
- s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
+ s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
_, status = s3_notification_conf.set_config()
assert_equal(status/100, 2)
# cleanup
topic_conf.del_config()
# delete the bucket
- zones[0].delete_bucket(bucket_name)
+ master_zone.delete_bucket(bucket_name)
def ps_s3_notification_filter(on_master):
if proc is None:
return SkipTest('end2end amqp tests require rabbitmq-server installed')
if on_master:
- zones, _ = init_env(require_ps=False)
- ps_zone = zones[0]
+ master_zone, _ = init_env(require_ps=False)
+ ps_zone = master_zone
else:
- zones, ps_zones = init_env(require_ps=True)
- ps_zone = ps_zones[0]
+ master_zone, ps_zone = init_env(require_ps=True)
+ ps_zone = ps_zone
realm = get_realm()
zonegroup = realm.master_zonegroup()
# create bucket
bucket_name = gen_bucket_name()
- bucket = zones[0].create_bucket(bucket_name)
+ bucket = master_zone.create_bucket(bucket_name)
topic_name = bucket_name + TOPIC_SUFFIX
# start amqp receivers
print('wait for 5sec for the messages...')
time.sleep(5)
else:
- zone_bucket_checkpoint(ps_zone.zone, zones[0].zone, bucket_name)
+ zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
found_in1 = []
found_in2 = []
# delete the bucket
for key in bucket.list():
key.delete()
- zones[0].delete_bucket(bucket_name)
+ master_zone.delete_bucket(bucket_name)
stop_amqp_receiver(receiver, task)
clean_rabbitmq(proc)
def test_ps_s3_notification_errors_on_master():
""" test s3 notification set/get/delete on master """
- zones, _ = init_env(require_ps=False)
+ master_zone, _ = init_env(require_ps=False)
realm = get_realm()
zonegroup = realm.master_zonegroup()
bucket_name = gen_bucket_name()
# create bucket
- bucket = zones[0].create_bucket(bucket_name)
+ bucket = master_zone.create_bucket(bucket_name)
topic_name = bucket_name + TOPIC_SUFFIX
# create s3 topic
endpoint_address = 'amqp://127.0.0.1:7001'
endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
- topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
+ topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
topic_arn = topic_conf.set_config()
# create s3 notification with invalid event name
'TopicArn': topic_arn,
'Events': ['s3:ObjectCreated:Kaboom']
}]
- s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
+ s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
try:
result, status = s3_notification_conf.set_config()
except Exception as error:
'TopicArn': topic_arn,
'Events': ['s3:ObjectCreated:Put']
}]
- s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
+ s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
try:
_, _ = s3_notification_conf.set_config()
except Exception as error:
'TopicArn': invalid_topic_arn,
'Events': ['s3:ObjectCreated:Put']
}]
- s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
+ s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
try:
_, _ = s3_notification_conf.set_config()
except Exception as error:
'TopicArn': invalid_topic_arn ,
'Events': ['s3:ObjectCreated:Put']
}]
- s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
+ s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
try:
_, _ = s3_notification_conf.set_config()
except Exception as error:
'TopicArn': topic_arn,
'Events': ['s3:ObjectCreated:Put']
}]
- s3_notification_conf = PSNotificationS3(zones[0].conn, 'kaboom', topic_conf_list)
+ s3_notification_conf = PSNotificationS3(master_zone.conn, 'kaboom', topic_conf_list)
try:
_, _ = s3_notification_conf.set_config()
except Exception as error:
# cleanup
# delete the bucket
- zones[0].delete_bucket(bucket_name)
+ master_zone.delete_bucket(bucket_name)
def test_objcet_timing():
return SkipTest("only used in manual testing")
- zones, _ = init_env(require_ps=False)
+ master_zone, _ = init_env(require_ps=False)
# create bucket
bucket_name = gen_bucket_name()
- bucket = zones[0].create_bucket(bucket_name)
+ bucket = master_zone.create_bucket(bucket_name)
# create objects in the bucket (async)
print('creating objects...')
number_of_objects = 1000
print('average time for object deletion: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
# cleanup
- zones[0].delete_bucket(bucket_name)
+ master_zone.delete_bucket(bucket_name)
def test_ps_s3_notification_push_amqp_on_master():
proc = init_rabbitmq()
if proc is None:
return SkipTest('end2end amqp tests require rabbitmq-server installed')
- zones, _ = init_env(require_ps=False)
+ master_zone, _ = init_env(require_ps=False)
realm = get_realm()
zonegroup = realm.master_zonegroup()
# create bucket
bucket_name = gen_bucket_name()
- bucket = zones[0].create_bucket(bucket_name)
+ bucket = master_zone.create_bucket(bucket_name)
topic_name1 = bucket_name + TOPIC_SUFFIX + '_1'
topic_name2 = bucket_name + TOPIC_SUFFIX + '_2'
endpoint_address = 'amqp://' + hostname
# with acks from broker
endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker'
- topic_conf1 = PSTopicS3(zones[0].conn, topic_name1, zonegroup.name, endpoint_args=endpoint_args)
+ topic_conf1 = PSTopicS3(master_zone.conn, topic_name1, zonegroup.name, endpoint_args=endpoint_args)
topic_arn1 = topic_conf1.set_config()
# without acks from broker
endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=none'
- topic_conf2 = PSTopicS3(zones[0].conn, topic_name2, zonegroup.name, endpoint_args=endpoint_args)
+ topic_conf2 = PSTopicS3(master_zone.conn, topic_name2, zonegroup.name, endpoint_args=endpoint_args)
topic_arn2 = topic_conf2.set_config()
# create s3 notification
notification_name = bucket_name + NOTIFICATION_SUFFIX
'Events': ['s3:ObjectCreated:*']
}]
- s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
+ s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
response, status = s3_notification_conf.set_config()
assert_equal(status/100, 2)
topic_conf1.del_config()
topic_conf2.del_config()
# delete the bucket
- zones[0].delete_bucket(bucket_name)
+ master_zone.delete_bucket(bucket_name)
clean_rabbitmq(proc)
if kafka_proc is None or zk_proc is None:
return SkipTest('end2end kafka tests require kafka/zookeeper installed')
- zones, ps_zones = init_env(require_ps=True)
+ master_zone, ps_zone = init_env()
realm = get_realm()
zonegroup = realm.master_zonegroup()
# create bucket
bucket_name = gen_bucket_name()
- bucket = zones[0].create_bucket(bucket_name)
+ bucket = master_zone.create_bucket(bucket_name)
# wait for sync
- zone_meta_checkpoint(ps_zones[0].zone)
+ zone_meta_checkpoint(ps_zone.zone)
# name is constant for manual testing
topic_name = bucket_name+'_topic'
# create consumer on the topic
task.start()
# create topic
- topic_conf = PSTopic(ps_zones[0].conn, topic_name,
+ topic_conf = PSTopic(ps_zone.conn, topic_name,
endpoint='kafka://' + kafka_server,
endpoint_args='kafka-ack-level=broker')
result, status = topic_conf.set_config()
'Events': []
}]
- s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
+ s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
response, status = s3_notification_conf.set_config()
assert_equal(status/100, 2)
[thr.join() for thr in client_threads]
# wait for sync
- zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
+ zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
keys = list(bucket.list())
receiver.verify_s3_events(keys, exact_match=True)
client_threads.append(thr)
[thr.join() for thr in client_threads]
- zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
+ zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
receiver.verify_s3_events(keys, exact_match=True, deletions=True)
# cleanup
s3_notification_conf.del_config()
topic_conf.del_config()
# delete the bucket
- zones[0].delete_bucket(bucket_name)
+ master_zone.delete_bucket(bucket_name)
stop_kafka_receiver(receiver, task)
clean_kafka(kafka_proc, zk_proc, kafka_log)
kafka_proc, zk_proc, kafka_log = init_kafka()
if kafka_proc is None or zk_proc is None:
return SkipTest('end2end kafka tests require kafka/zookeeper installed')
- zones, _ = init_env(require_ps=False)
+ master_zone, _ = init_env(require_ps=False)
realm = get_realm()
zonegroup = realm.master_zonegroup()
# create bucket
bucket_name = gen_bucket_name()
- bucket = zones[0].create_bucket(bucket_name)
+ bucket = master_zone.create_bucket(bucket_name)
# name is constant for manual testing
topic_name = bucket_name+'_topic'
# create consumer on the topic
endpoint_address = 'kafka://' + kafka_server
# without acks from broker
endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker'
- topic_conf1 = PSTopicS3(zones[0].conn, topic_name+'_1', zonegroup.name, endpoint_args=endpoint_args)
+ topic_conf1 = PSTopicS3(master_zone.conn, topic_name+'_1', zonegroup.name, endpoint_args=endpoint_args)
topic_arn1 = topic_conf1.set_config()
endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=none'
- topic_conf2 = PSTopicS3(zones[0].conn, topic_name+'_2', zonegroup.name, endpoint_args=endpoint_args)
+ topic_conf2 = PSTopicS3(master_zone.conn, topic_name+'_2', zonegroup.name, endpoint_args=endpoint_args)
topic_arn2 = topic_conf2.set_config()
# create s3 notification
notification_name = bucket_name + NOTIFICATION_SUFFIX
'Events': []
}]
- s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
+ s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
response, status = s3_notification_conf.set_config()
assert_equal(status/100, 2)
topic_conf1.del_config()
topic_conf2.del_config()
# delete the bucket
- zones[0].delete_bucket(bucket_name)
+ master_zone.delete_bucket(bucket_name)
stop_kafka_receiver(receiver, task)
clean_kafka(kafka_proc, zk_proc, kafka_log)
""" test pushing kafka s3 notification on master """
if skip_push_tests:
return SkipTest("PubSub push tests don't run in teuthology")
- zones, _ = init_env(require_ps=False)
- if security_type == 'SSL_SASL' and zones[0].secure_conn is None:
+ master_zone, _ = init_env(require_ps=False)
+ if security_type == 'SSL_SASL' and master_zone.secure_conn is None:
return SkipTest("secure connection is needed to test SASL_SSL security")
kafka_proc, zk_proc, kafka_log = init_kafka()
if kafka_proc is None or zk_proc is None:
# create bucket
bucket_name = gen_bucket_name()
- bucket = zones[0].create_bucket(bucket_name)
+ bucket = master_zone.create_bucket(bucket_name)
# name is constant for manual testing
topic_name = bucket_name+'_topic'
# create consumer on the topic
endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=none&use-ssl=true&ca-location='+KAFKA_DIR+'rootCA.crt'
if security_type == 'SSL_SASL':
- topic_conf = PSTopicS3(zones[0].secure_conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
+ topic_conf = PSTopicS3(master_zone.secure_conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
else:
- topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
+ topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
topic_arn = topic_conf.set_config()
# create s3 notification
'Events': []
}]
- s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
+ s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
s3_notification_conf.set_config()
# create objects in the bucket (async)
# delete the bucket
for key in bucket.list():
key.delete()
- zones[0].delete_bucket(bucket_name)
+ master_zone.delete_bucket(bucket_name)
stop_kafka_receiver(receiver, task)
clean_kafka(kafka_proc, zk_proc, kafka_log)
if skip_push_tests:
return SkipTest("PubSub push tests don't run in teuthology")
hostname = get_ip()
- zones, _ = init_env(require_ps=False)
+ master_zone, _ = init_env(require_ps=False)
realm = get_realm()
zonegroup = realm.master_zonegroup()
# create bucket
bucket_name = gen_bucket_name()
- bucket = zones[0].create_bucket(bucket_name)
+ bucket = master_zone.create_bucket(bucket_name)
topic_name = bucket_name + TOPIC_SUFFIX
# create s3 topic
endpoint_address = 'http://'+host+':'+str(port)
endpoint_args = 'push-endpoint='+endpoint_address
- topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
+ topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
topic_arn = topic_conf.set_config()
# create s3 notification
notification_name = bucket_name + NOTIFICATION_SUFFIX
'TopicArn': topic_arn,
'Events': []
}]
- s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
+ s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
response, status = s3_notification_conf.set_config()
assert_equal(status/100, 2)
topic_conf.del_config()
s3_notification_conf.del_config(notification=notification_name)
# delete the bucket
- zones[0].delete_bucket(bucket_name)
+ master_zone.delete_bucket(bucket_name)
http_server.close()
if skip_push_tests:
return SkipTest("PubSub push tests don't run in teuthology")
hostname = get_ip()
- zones, ps_zones = init_env(require_ps=True)
+ master_zone, ps_zone = init_env()
realm = get_realm()
zonegroup = realm.master_zonegroup()
# create bucket
bucket_name = gen_bucket_name()
- bucket = zones[0].create_bucket(bucket_name)
+ bucket = master_zone.create_bucket(bucket_name)
topic_name = bucket_name + TOPIC_SUFFIX
# wait for sync
- zone_meta_checkpoint(ps_zones[0].zone)
+ zone_meta_checkpoint(ps_zone.zone)
# create s3 topic
endpoint_address = 'http://'+host+':'+str(port)
opaque_data = 'http://1.2.3.4:8888'
endpoint_args = 'push-endpoint='+endpoint_address+'&OpaqueData='+opaque_data
- topic_conf = PSTopic(ps_zones[0].conn, topic_name, endpoint=endpoint_address, endpoint_args=endpoint_args)
+ topic_conf = PSTopic(ps_zone.conn, topic_name, endpoint=endpoint_address, endpoint_args=endpoint_args)
result, status = topic_conf.set_config()
assert_equal(status/100, 2)
parsed_result = json.loads(result)
'TopicArn': topic_arn,
'Events': []
}]
- s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
+ s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
response, status = s3_notification_conf.set_config()
assert_equal(status/100, 2)
[thr.join() for thr in client_threads]
# wait for sync
- zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
+ zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
# check http receiver
keys = list(bucket.list())
topic_conf.del_config()
s3_notification_conf.del_config(notification=notification_name)
# delete the bucket
- zones[0].delete_bucket(bucket_name)
+ master_zone.delete_bucket(bucket_name)
http_server.close()
if skip_push_tests:
return SkipTest("PubSub push tests don't run in teuthology")
hostname = get_ip()
- zones, _ = init_env(require_ps=False)
+ master_zone, _ = init_env(require_ps=False)
realm = get_realm()
zonegroup = realm.master_zonegroup()
# create bucket
bucket_name = gen_bucket_name()
- bucket = zones[0].create_bucket(bucket_name)
+ bucket = master_zone.create_bucket(bucket_name)
topic_name = bucket_name + TOPIC_SUFFIX
# create s3 topic
endpoint_address = 'http://'+host+':'+str(port)
endpoint_args = 'push-endpoint='+endpoint_address
opaque_data = 'http://1.2.3.4:8888'
- topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name, endpoint_args=endpoint_args, opaque_data=opaque_data)
+ topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args, opaque_data=opaque_data)
topic_arn = topic_conf.set_config()
# create s3 notification
notification_name = bucket_name + NOTIFICATION_SUFFIX
'TopicArn': topic_arn,
'Events': []
}]
- s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
+ s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
response, status = s3_notification_conf.set_config()
assert_equal(status/100, 2)
topic_conf.del_config()
s3_notification_conf.del_config(notification=notification_name)
# delete the bucket
- zones[0].delete_bucket(bucket_name)
+ master_zone.delete_bucket(bucket_name)
http_server.close()
def test_ps_topic():
""" test set/get/delete of topic """
- _, ps_zones = init_env()
+ _, ps_zone = init_env()
realm = get_realm()
zonegroup = realm.master_zonegroup()
bucket_name = gen_bucket_name()
topic_name = bucket_name+TOPIC_SUFFIX
# create topic
- topic_conf = PSTopic(ps_zones[0].conn, topic_name)
+ topic_conf = PSTopic(ps_zone.conn, topic_name)
_, status = topic_conf.set_config()
assert_equal(status/100, 2)
# get topic
def test_ps_topic_with_endpoint():
""" test set topic with endpoint"""
- _, ps_zones = init_env()
+ _, ps_zone = init_env()
bucket_name = gen_bucket_name()
topic_name = bucket_name+TOPIC_SUFFIX
# create topic
dest_endpoint = 'amqp://localhost:7001'
dest_args = 'amqp-exchange=amqp.direct&amqp-ack-level=none'
- topic_conf = PSTopic(ps_zones[0].conn, topic_name,
+ topic_conf = PSTopic(ps_zone.conn, topic_name,
endpoint=dest_endpoint,
endpoint_args=dest_args)
_, status = topic_conf.set_config()
def test_ps_notification():
""" test set/get/delete of notification """
- zones, ps_zones = init_env()
+ master_zone, ps_zone = init_env()
bucket_name = gen_bucket_name()
topic_name = bucket_name+TOPIC_SUFFIX
# create topic
- topic_conf = PSTopic(ps_zones[0].conn, topic_name)
+ topic_conf = PSTopic(ps_zone.conn, topic_name)
topic_conf.set_config()
# create bucket on the first of the rados zones
- zones[0].create_bucket(bucket_name)
+ master_zone.create_bucket(bucket_name)
# wait for sync
- zone_meta_checkpoint(ps_zones[0].zone)
+ zone_meta_checkpoint(ps_zone.zone)
# create notifications
- notification_conf = PSNotification(ps_zones[0].conn, bucket_name,
+ notification_conf = PSNotification(ps_zone.conn, bucket_name,
topic_name)
_, status = notification_conf.set_config()
assert_equal(status/100, 2)
# cleanup
topic_conf.del_config()
- zones[0].delete_bucket(bucket_name)
+ master_zone.delete_bucket(bucket_name)
def test_ps_notification_events():
""" test set/get/delete of notification on specific events"""
- zones, ps_zones = init_env()
+ master_zone, ps_zone = init_env()
bucket_name = gen_bucket_name()
topic_name = bucket_name+TOPIC_SUFFIX
# create topic
- topic_conf = PSTopic(ps_zones[0].conn, topic_name)
+ topic_conf = PSTopic(ps_zone.conn, topic_name)
topic_conf.set_config()
# create bucket on the first of the rados zones
- zones[0].create_bucket(bucket_name)
+ master_zone.create_bucket(bucket_name)
# wait for sync
- zone_meta_checkpoint(ps_zones[0].zone)
+ zone_meta_checkpoint(ps_zone.zone)
# create notifications
events = "OBJECT_CREATE,OBJECT_DELETE"
- notification_conf = PSNotification(ps_zones[0].conn, bucket_name,
+ notification_conf = PSNotification(ps_zone.conn, bucket_name,
topic_name,
events)
_, status = notification_conf.set_config()
# cleanup
notification_conf.del_config()
topic_conf.del_config()
- zones[0].delete_bucket(bucket_name)
+ master_zone.delete_bucket(bucket_name)
def test_ps_subscription():
""" test set/get/delete of subscription """
- zones, ps_zones = init_env()
+ master_zone, ps_zone = init_env()
bucket_name = gen_bucket_name()
topic_name = bucket_name+TOPIC_SUFFIX
# create topic
- topic_conf = PSTopic(ps_zones[0].conn, topic_name)
+ topic_conf = PSTopic(ps_zone.conn, topic_name)
topic_conf.set_config()
# create bucket on the first of the rados zones
- bucket = zones[0].create_bucket(bucket_name)
+ bucket = master_zone.create_bucket(bucket_name)
# wait for sync
- zone_meta_checkpoint(ps_zones[0].zone)
+ zone_meta_checkpoint(ps_zone.zone)
# create notifications
- notification_conf = PSNotification(ps_zones[0].conn, bucket_name,
+ notification_conf = PSNotification(ps_zone.conn, bucket_name,
topic_name)
_, status = notification_conf.set_config()
assert_equal(status/100, 2)
# create subscription
- sub_conf = PSSubscription(ps_zones[0].conn, bucket_name+SUB_SUFFIX,
+ sub_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX,
topic_name)
_, status = sub_conf.set_config()
assert_equal(status/100, 2)
key = bucket.new_key(str(i))
key.set_contents_from_string('bar')
# wait for sync
- zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
+ zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
# get the create events from the subscription
result, _ = sub_conf.get_events()
for key in bucket.list():
key.delete()
# wait for sync
- zone_meta_checkpoint(ps_zones[0].zone)
- zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
+ zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
# get the delete events from the subscriptions
- result, _ = sub_conf.get_events()
- for event in events['events']:
- log.debug('Event: objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"')
+ #result, _ = sub_conf.get_events()
+ #for event in events['events']:
+ # log.debug('Event: objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"')
# TODO: check deletions
# TODO: use exact match
# verify_events_by_elements(events, keys, exact_match=False, deletions=True)
# cleanup
notification_conf.del_config()
topic_conf.del_config()
- zones[0].delete_bucket(bucket_name)
+ master_zone.delete_bucket(bucket_name)
def test_ps_event_type_subscription():
""" test subscriptions for different events """
- zones, ps_zones = init_env()
+ master_zone, ps_zone = init_env()
bucket_name = gen_bucket_name()
# create topic for objects creation
topic_create_name = bucket_name+TOPIC_SUFFIX+'_create'
- topic_create_conf = PSTopic(ps_zones[0].conn, topic_create_name)
+ topic_create_conf = PSTopic(ps_zone.conn, topic_create_name)
topic_create_conf.set_config()
# create topic for objects deletion
topic_delete_name = bucket_name+TOPIC_SUFFIX+'_delete'
- topic_delete_conf = PSTopic(ps_zones[0].conn, topic_delete_name)
+ topic_delete_conf = PSTopic(ps_zone.conn, topic_delete_name)
topic_delete_conf.set_config()
# create topic for all events
topic_name = bucket_name+TOPIC_SUFFIX+'_all'
- topic_conf = PSTopic(ps_zones[0].conn, topic_name)
+ topic_conf = PSTopic(ps_zone.conn, topic_name)
topic_conf.set_config()
# create bucket on the first of the rados zones
- bucket = zones[0].create_bucket(bucket_name)
+ bucket = master_zone.create_bucket(bucket_name)
# wait for sync
- zone_meta_checkpoint(ps_zones[0].zone)
- zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
+ zone_meta_checkpoint(ps_zone.zone)
+ zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
# create notifications for objects creation
- notification_create_conf = PSNotification(ps_zones[0].conn, bucket_name,
+ notification_create_conf = PSNotification(ps_zone.conn, bucket_name,
topic_create_name, "OBJECT_CREATE")
_, status = notification_create_conf.set_config()
assert_equal(status/100, 2)
# create notifications for objects deletion
- notification_delete_conf = PSNotification(ps_zones[0].conn, bucket_name,
+ notification_delete_conf = PSNotification(ps_zone.conn, bucket_name,
topic_delete_name, "OBJECT_DELETE")
_, status = notification_delete_conf.set_config()
assert_equal(status/100, 2)
# create notifications for all events
- notification_conf = PSNotification(ps_zones[0].conn, bucket_name,
+ notification_conf = PSNotification(ps_zone.conn, bucket_name,
topic_name, "OBJECT_DELETE,OBJECT_CREATE")
_, status = notification_conf.set_config()
assert_equal(status/100, 2)
# create subscription for objects creation
- sub_create_conf = PSSubscription(ps_zones[0].conn, bucket_name+SUB_SUFFIX+'_create',
+ sub_create_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX+'_create',
topic_create_name)
_, status = sub_create_conf.set_config()
assert_equal(status/100, 2)
# create subscription for objects deletion
- sub_delete_conf = PSSubscription(ps_zones[0].conn, bucket_name+SUB_SUFFIX+'_delete',
+ sub_delete_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX+'_delete',
topic_delete_name)
_, status = sub_delete_conf.set_config()
assert_equal(status/100, 2)
# create subscription for all events
- sub_conf = PSSubscription(ps_zones[0].conn, bucket_name+SUB_SUFFIX+'_all',
+ sub_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX+'_all',
topic_name)
_, status = sub_conf.set_config()
assert_equal(status/100, 2)
key = bucket.new_key(str(i))
key.set_contents_from_string('bar')
# wait for sync
- zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
+ zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
# get the events from the creation subscription
result, _ = sub_create_conf.get_events()
for key in bucket.list():
key.delete()
# wait for sync
- zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
+ zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
log.debug("Event (OBJECT_DELETE) synced")
# get the events from the creations subscription
topic_create_conf.del_config()
topic_delete_conf.del_config()
topic_conf.del_config()
- zones[0].delete_bucket(bucket_name)
+ master_zone.delete_bucket(bucket_name)
def test_ps_event_fetching():
""" test incremental fetching of events from a subscription """
- zones, ps_zones = init_env()
+ master_zone, ps_zone = init_env()
bucket_name = gen_bucket_name()
topic_name = bucket_name+TOPIC_SUFFIX
# create topic
- topic_conf = PSTopic(ps_zones[0].conn, topic_name)
+ topic_conf = PSTopic(ps_zone.conn, topic_name)
topic_conf.set_config()
# create bucket on the first of the rados zones
- bucket = zones[0].create_bucket(bucket_name)
+ bucket = master_zone.create_bucket(bucket_name)
# wait for sync
- zone_meta_checkpoint(ps_zones[0].zone)
+ zone_meta_checkpoint(ps_zone.zone)
# create notifications
- notification_conf = PSNotification(ps_zones[0].conn, bucket_name,
+ notification_conf = PSNotification(ps_zone.conn, bucket_name,
topic_name)
_, status = notification_conf.set_config()
assert_equal(status/100, 2)
# create subscription
- sub_conf = PSSubscription(ps_zones[0].conn, bucket_name+SUB_SUFFIX,
+ sub_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX,
topic_name)
_, status = sub_conf.set_config()
assert_equal(status/100, 2)
key = bucket.new_key(str(i))
key.set_contents_from_string('bar')
# wait for sync
- zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
+ zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
max_events = 15
total_events_count = 0
next_marker = None
topic_conf.del_config()
for key in bucket.list():
key.delete()
- zones[0].delete_bucket(bucket_name)
+ master_zone.delete_bucket(bucket_name)
def test_ps_event_acking():
""" test acking of some events in a subscription """
- zones, ps_zones = init_env()
+ master_zone, ps_zone = init_env()
bucket_name = gen_bucket_name()
topic_name = bucket_name+TOPIC_SUFFIX
# create topic
- topic_conf = PSTopic(ps_zones[0].conn, topic_name)
+ topic_conf = PSTopic(ps_zone.conn, topic_name)
topic_conf.set_config()
# create bucket on the first of the rados zones
- bucket = zones[0].create_bucket(bucket_name)
+ bucket = master_zone.create_bucket(bucket_name)
# wait for sync
- zone_meta_checkpoint(ps_zones[0].zone)
+ zone_meta_checkpoint(ps_zone.zone)
# create notifications
- notification_conf = PSNotification(ps_zones[0].conn, bucket_name,
+ notification_conf = PSNotification(ps_zone.conn, bucket_name,
topic_name)
_, status = notification_conf.set_config()
assert_equal(status/100, 2)
# create subscription
- sub_conf = PSSubscription(ps_zones[0].conn, bucket_name+SUB_SUFFIX,
+ sub_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX,
topic_name)
_, status = sub_conf.set_config()
assert_equal(status/100, 2)
key = bucket.new_key(str(i))
key.set_contents_from_string('bar')
# wait for sync
- zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
+ zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
# get the create events from the subscription
result, _ = sub_conf.get_events()
topic_conf.del_config()
for key in bucket.list():
key.delete()
- zones[0].delete_bucket(bucket_name)
+ master_zone.delete_bucket(bucket_name)
def test_ps_creation_triggers():
""" test object creation notifications in using put/copy/post """
- zones, ps_zones = init_env()
+ master_zone, ps_zone = init_env()
bucket_name = gen_bucket_name()
topic_name = bucket_name+TOPIC_SUFFIX
# create topic
- topic_conf = PSTopic(ps_zones[0].conn, topic_name)
+ topic_conf = PSTopic(ps_zone.conn, topic_name)
topic_conf.set_config()
# create bucket on the first of the rados zones
- bucket = zones[0].create_bucket(bucket_name)
+ bucket = master_zone.create_bucket(bucket_name)
# wait for sync
- zone_meta_checkpoint(ps_zones[0].zone)
+ zone_meta_checkpoint(ps_zone.zone)
# create notifications
- notification_conf = PSNotification(ps_zones[0].conn, bucket_name,
+ notification_conf = PSNotification(ps_zone.conn, bucket_name,
topic_name)
_, status = notification_conf.set_config()
assert_equal(status/100, 2)
# create subscription
- sub_conf = PSSubscription(ps_zones[0].conn, bucket_name+SUB_SUFFIX,
+ sub_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX,
topic_name)
_, status = sub_conf.set_config()
assert_equal(status/100, 2)
uploader.complete_upload()
fp.close()
# wait for sync
- zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
+ zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
# get the create events from the subscription
result, _ = sub_conf.get_events()
topic_conf.del_config()
for key in bucket.list():
key.delete()
- zones[0].delete_bucket(bucket_name)
+ master_zone.delete_bucket(bucket_name)
def test_ps_s3_creation_triggers_on_master():
proc = init_rabbitmq()
if proc is None:
return SkipTest('end2end amqp tests require rabbitmq-server installed')
- zones, _ = init_env(require_ps=False)
+ master_zone, _ = init_env(require_ps=False)
realm = get_realm()
zonegroup = realm.master_zonegroup()
# create bucket
bucket_name = gen_bucket_name()
- bucket = zones[0].create_bucket(bucket_name)
+ bucket = master_zone.create_bucket(bucket_name)
topic_name = bucket_name + TOPIC_SUFFIX
# start amqp receiver
# create s3 topic
endpoint_address = 'amqp://' + hostname
endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker'
- topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
+ topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
topic_arn = topic_conf.set_config()
# create s3 notification
notification_name = bucket_name + NOTIFICATION_SUFFIX
'Events': ['s3:ObjectCreated:Put', 's3:ObjectCreated:Copy']
}]
- s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
+ s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
response, status = s3_notification_conf.set_config()
assert_equal(status/100, 2)
for key in bucket.list():
key.delete()
# delete the bucket
- zones[0].delete_bucket(bucket_name)
+ master_zone.delete_bucket(bucket_name)
clean_rabbitmq(proc)
proc = init_rabbitmq()
if proc is None:
return SkipTest('end2end amqp tests require rabbitmq-server installed')
- zones, _ = init_env(require_ps=False)
+ master_zone, _ = init_env(require_ps=False)
realm = get_realm()
zonegroup = realm.master_zonegroup()
# create bucket
bucket_name = gen_bucket_name()
- bucket = zones[0].create_bucket(bucket_name)
+ bucket = master_zone.create_bucket(bucket_name)
topic_name = bucket_name + TOPIC_SUFFIX
# start amqp receivers
# create s3 topics
endpoint_address = 'amqp://' + hostname
endpoint_args = 'push-endpoint=' + endpoint_address + '&amqp-exchange=' + exchange + '&amqp-ack-level=broker'
- topic_conf1 = PSTopicS3(zones[0].conn, topic_name+'_1', zonegroup.name, endpoint_args=endpoint_args)
+ topic_conf1 = PSTopicS3(master_zone.conn, topic_name+'_1', zonegroup.name, endpoint_args=endpoint_args)
topic_arn1 = topic_conf1.set_config()
- topic_conf2 = PSTopicS3(zones[0].conn, topic_name+'_2', zonegroup.name, endpoint_args=endpoint_args)
+ topic_conf2 = PSTopicS3(master_zone.conn, topic_name+'_2', zonegroup.name, endpoint_args=endpoint_args)
topic_arn2 = topic_conf2.set_config()
- topic_conf3 = PSTopicS3(zones[0].conn, topic_name+'_3', zonegroup.name, endpoint_args=endpoint_args)
+ topic_conf3 = PSTopicS3(master_zone.conn, topic_name+'_3', zonegroup.name, endpoint_args=endpoint_args)
topic_arn3 = topic_conf3.set_config()
# create s3 notifications
{'Id': notification_name+'_3', 'TopicArn': topic_arn3,
'Events': ['s3:ObjectCreated:CompleteMultipartUpload']
}]
- s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
+ s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
response, status = s3_notification_conf.set_config()
assert_equal(status/100, 2)
for key in bucket.list():
key.delete()
# delete the bucket
- zones[0].delete_bucket(bucket_name)
+ master_zone.delete_bucket(bucket_name)
clean_rabbitmq(proc)
def test_ps_versioned_deletion():
""" test notification of deletion markers """
- zones, ps_zones = init_env()
+ master_zone, ps_zone = init_env()
bucket_name = gen_bucket_name()
topic_name = bucket_name+TOPIC_SUFFIX
# create topics
- topic_conf1 = PSTopic(ps_zones[0].conn, topic_name+'_1')
+ topic_conf1 = PSTopic(ps_zone.conn, topic_name+'_1')
_, status = topic_conf1.set_config()
assert_equal(status/100, 2)
- topic_conf2 = PSTopic(ps_zones[0].conn, topic_name+'_2')
+ topic_conf2 = PSTopic(ps_zone.conn, topic_name+'_2')
_, status = topic_conf2.set_config()
assert_equal(status/100, 2)
# create bucket on the first of the rados zones
- bucket = zones[0].create_bucket(bucket_name)
+ bucket = master_zone.create_bucket(bucket_name)
bucket.configure_versioning(True)
# wait for sync
- zone_meta_checkpoint(ps_zones[0].zone)
+ zone_meta_checkpoint(ps_zone.zone)
# create notifications
event_type1 = 'OBJECT_DELETE'
- notification_conf1 = PSNotification(ps_zones[0].conn, bucket_name,
+ notification_conf1 = PSNotification(ps_zone.conn, bucket_name,
topic_name+'_1',
event_type1)
_, status = notification_conf1.set_config()
assert_equal(status/100, 2)
event_type2 = 'DELETE_MARKER_CREATE'
- notification_conf2 = PSNotification(ps_zones[0].conn, bucket_name,
+ notification_conf2 = PSNotification(ps_zone.conn, bucket_name,
topic_name+'_2',
event_type2)
_, status = notification_conf2.set_config()
assert_equal(status/100, 2)
# create subscriptions
- sub_conf1 = PSSubscription(ps_zones[0].conn, bucket_name+SUB_SUFFIX+'_1',
+ sub_conf1 = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX+'_1',
topic_name+'_1')
_, status = sub_conf1.set_config()
assert_equal(status/100, 2)
- sub_conf2 = PSSubscription(ps_zones[0].conn, bucket_name+SUB_SUFFIX+'_2',
+ sub_conf2 = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX+'_2',
topic_name+'_2')
_, status = sub_conf2.set_config()
assert_equal(status/100, 2)
delete_marker_key = bucket.delete_key(key.name)
# wait for sync
- zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
+ zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
# delete the deletion marker
delete_marker_key.delete()
bucket.delete_key(key.name, version_id=v1)
# wait for sync
- zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
+ zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
# get the delete events from the subscription
result, _ = sub_conf1.get_events()
zonegroup_conns = ZonegroupConns(zonegroup)
try:
zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name)
- zones[0].delete_bucket(bucket_name)
+ master_zone.delete_bucket(bucket_name)
except:
log.debug('zonegroup_bucket_checkpoint failed, cannot delete bucket')
sub_conf1.del_config()
proc = init_rabbitmq()
if proc is None:
return SkipTest('end2end amqp tests require rabbitmq-server installed')
- zones, _ = init_env(require_ps=False)
+ master_zone, _ = init_env(require_ps=False)
realm = get_realm()
zonegroup = realm.master_zonegroup()
# create bucket
bucket_name = gen_bucket_name()
- bucket = zones[0].create_bucket(bucket_name)
+ bucket = master_zone.create_bucket(bucket_name)
topic_name = bucket_name + TOPIC_SUFFIX
# start amqp receiver
# create s3 topic
endpoint_address = 'amqp://' + hostname
endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker'
- topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
+ topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
topic_arn = topic_conf.set_config()
# create s3 notification
notification_name = bucket_name + NOTIFICATION_SUFFIX
}
}]
- s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
+ s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
response, status = s3_notification_conf.set_config()
assert_equal(status/100, 2)
s3_notification_conf.del_config()
topic_conf.del_config()
# delete the bucket
- zones[0].delete_bucket(bucket_name)
+ master_zone.delete_bucket(bucket_name)
clean_rabbitmq(proc)
proc = init_rabbitmq()
if proc is None:
return SkipTest('end2end amqp tests require rabbitmq-server installed')
- zones, _ = init_env(require_ps=False)
+ master_zone, _ = init_env(require_ps=False)
realm = get_realm()
zonegroup = realm.master_zonegroup()
# create bucket
bucket_name = gen_bucket_name()
- bucket = zones[0].create_bucket(bucket_name)
+ bucket = master_zone.create_bucket(bucket_name)
topic_name = bucket_name + TOPIC_SUFFIX
# start amqp receiver
# create s3 topic
endpoint_address = 'amqp://' + hostname
endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker'
- topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
+ topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
topic_arn = topic_conf.set_config()
# create s3 notification
notification_name = bucket_name + NOTIFICATION_SUFFIX
}
}]
- s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
+ s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
response, status = s3_notification_conf.set_config()
assert_equal(status/100, 2)
# create objects in the bucket with tags
tags = 'hello=world&ka=boom'
key_name1 = 'key1'
- put_object_tagging(zones[0].conn, bucket_name, key_name1, tags)
+ put_object_tagging(master_zone.conn, bucket_name, key_name1, tags)
tags = 'foo=bar&ka=boom'
key_name2 = 'key2'
- put_object_tagging(zones[0].conn, bucket_name, key_name2, tags)
+ put_object_tagging(master_zone.conn, bucket_name, key_name2, tags)
key_name3 = 'key3'
key = bucket.new_key(key_name3)
key.set_contents_from_string('bar')
expected_tags = [{'val': 'world', 'key': 'hello'}, {'val': 'boom', 'key': 'ka'}]
# check amqp receiver
for event in receiver.get_and_reset_events():
- obj_tags = event['s3']['object']['tags']
+ obj_tags = event['Records'][0]['s3']['object']['tags']
assert_equal(obj_tags[0], expected_tags[0])
# delete the objects
time.sleep(5)
# check amqp receiver
for event in receiver.get_and_reset_events():
- obj_tags = event['s3']['object']['tags']
+ obj_tags = event['Records'][0]['s3']['object']['tags']
assert_equal(obj_tags[0], expected_tags[0])
# cleanup
s3_notification_conf.del_config()
topic_conf.del_config()
# delete the bucket
- zones[0].delete_bucket(bucket_name)
+ master_zone.delete_bucket(bucket_name)
clean_rabbitmq(proc)
proc = init_rabbitmq()
if proc is None:
return SkipTest('end2end amqp tests require rabbitmq-server installed')
- zones, _ = init_env(require_ps=False)
+ master_zone, _ = init_env(require_ps=False)
realm = get_realm()
zonegroup = realm.master_zonegroup()
# create bucket
bucket_name = gen_bucket_name()
- bucket = zones[0].create_bucket(bucket_name)
+ bucket = master_zone.create_bucket(bucket_name)
bucket.configure_versioning(True)
topic_name = bucket_name + TOPIC_SUFFIX
# create s3 topic
endpoint_address = 'amqp://' + hostname
endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker'
- topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
+ topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
topic_arn = topic_conf.set_config()
# create s3 notification
notification_name = bucket_name + NOTIFICATION_SUFFIX
{'Id': notification_name+'_3', 'TopicArn': topic_arn,
'Events': ['s3:ObjectRemoved:Delete']
}]
- s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
+ s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
response, status = s3_notification_conf.set_config()
assert_equal(status/100, 2)
s3_notification_conf.del_config()
topic_conf.del_config()
# delete the bucket
- zones[0].delete_bucket(bucket_name)
+ master_zone.delete_bucket(bucket_name)
clean_rabbitmq(proc)
""" test pushing to http endpoint """
if skip_push_tests:
return SkipTest("PubSub push tests don't run in teuthology")
- zones, ps_zones = init_env()
+ master_zone, ps_zone = init_env()
bucket_name = gen_bucket_name()
topic_name = bucket_name+TOPIC_SUFFIX
http_server = StreamingHTTPServer(host, port)
# create topic
- topic_conf = PSTopic(ps_zones[0].conn, topic_name)
+ topic_conf = PSTopic(ps_zone.conn, topic_name)
_, status = topic_conf.set_config()
assert_equal(status/100, 2)
# create bucket on the first of the rados zones
- bucket = zones[0].create_bucket(bucket_name)
+ bucket = master_zone.create_bucket(bucket_name)
# wait for sync
- zone_meta_checkpoint(ps_zones[0].zone)
+ zone_meta_checkpoint(ps_zone.zone)
# create notifications
- notification_conf = PSNotification(ps_zones[0].conn, bucket_name,
+ notification_conf = PSNotification(ps_zone.conn, bucket_name,
topic_name)
_, status = notification_conf.set_config()
assert_equal(status/100, 2)
# create subscription
- sub_conf = PSSubscription(ps_zones[0].conn, bucket_name+SUB_SUFFIX,
+ sub_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX,
topic_name, endpoint='http://'+host+':'+str(port))
_, status = sub_conf.set_config()
assert_equal(status/100, 2)
key = bucket.new_key(str(i))
key.set_contents_from_string('bar')
# wait for sync
- zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
+ zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
# check http server
keys = list(bucket.list())
# TODO: use exact match
for key in bucket.list():
key.delete()
# wait for sync
- zone_meta_checkpoint(ps_zones[0].zone)
- zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
+ zone_meta_checkpoint(ps_zone.zone)
+ zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
# check http server
# TODO: use exact match
http_server.verify_events(keys, deletions=True, exact_match=False)
sub_conf.del_config()
notification_conf.del_config()
topic_conf.del_config()
- zones[0].delete_bucket(bucket_name)
+ master_zone.delete_bucket(bucket_name)
http_server.close()
""" test pushing to http endpoint s3 record format"""
if skip_push_tests:
return SkipTest("PubSub push tests don't run in teuthology")
- zones, ps_zones = init_env()
+ master_zone, ps_zone = init_env()
bucket_name = gen_bucket_name()
topic_name = bucket_name+TOPIC_SUFFIX
http_server = StreamingHTTPServer(host, port)
# create topic
- topic_conf = PSTopic(ps_zones[0].conn, topic_name,
+ topic_conf = PSTopic(ps_zone.conn, topic_name,
endpoint='http://'+host+':'+str(port))
result, status = topic_conf.set_config()
assert_equal(status/100, 2)
parsed_result = json.loads(result)
topic_arn = parsed_result['arn']
# create bucket on the first of the rados zones
- bucket = zones[0].create_bucket(bucket_name)
+ bucket = master_zone.create_bucket(bucket_name)
# wait for sync
- zone_meta_checkpoint(ps_zones[0].zone)
+ zone_meta_checkpoint(ps_zone.zone)
# create s3 notification
notification_name = bucket_name + NOTIFICATION_SUFFIX
topic_conf_list = [{'Id': notification_name,
'TopicArn': topic_arn,
'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*']
}]
- s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
+ s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
_, status = s3_notification_conf.set_config()
assert_equal(status/100, 2)
# create objects in the bucket
key = bucket.new_key(str(i))
key.set_contents_from_string('bar')
# wait for sync
- zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
+ zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
# check http server
keys = list(bucket.list())
# TODO: use exact match
for key in bucket.list():
key.delete()
# wait for sync
- zone_meta_checkpoint(ps_zones[0].zone)
- zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
+ zone_meta_checkpoint(ps_zone.zone)
+ zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
# check http server
# TODO: use exact match
http_server.verify_s3_events(keys, deletions=True, exact_match=False)
# cleanup
s3_notification_conf.del_config()
topic_conf.del_config()
- zones[0].delete_bucket(bucket_name)
+ master_zone.delete_bucket(bucket_name)
http_server.close()
proc = init_rabbitmq()
if proc is None:
return SkipTest('end2end amqp tests require rabbitmq-server installed')
- zones, ps_zones = init_env()
+ master_zone, ps_zone = init_env()
bucket_name = gen_bucket_name()
topic_name = bucket_name+TOPIC_SUFFIX
exchange = 'ex1'
task, receiver = create_amqp_receiver_thread(exchange, topic_name)
task.start()
- topic_conf = PSTopic(ps_zones[0].conn, topic_name)
+ topic_conf = PSTopic(ps_zone.conn, topic_name)
_, status = topic_conf.set_config()
assert_equal(status/100, 2)
# create bucket on the first of the rados zones
- bucket = zones[0].create_bucket(bucket_name)
+ bucket = master_zone.create_bucket(bucket_name)
# wait for sync
- zone_meta_checkpoint(ps_zones[0].zone)
+ zone_meta_checkpoint(ps_zone.zone)
# create notifications
- notification_conf = PSNotification(ps_zones[0].conn, bucket_name,
+ notification_conf = PSNotification(ps_zone.conn, bucket_name,
topic_name)
_, status = notification_conf.set_config()
assert_equal(status/100, 2)
# create subscription
- sub_conf = PSSubscription(ps_zones[0].conn, bucket_name+SUB_SUFFIX,
+ sub_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX,
topic_name, endpoint='amqp://'+hostname,
endpoint_args='amqp-exchange='+exchange+'&amqp-ack-level=broker')
_, status = sub_conf.set_config()
key = bucket.new_key(str(i))
key.set_contents_from_string('bar')
# wait for sync
- zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
+ zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
# check amqp receiver
keys = list(bucket.list())
# TODO: use exact match
for key in bucket.list():
key.delete()
# wait for sync
- zone_meta_checkpoint(ps_zones[0].zone)
- zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
+ zone_meta_checkpoint(ps_zone.zone)
+ zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
# check amqp receiver
# TODO: use exact match
receiver.verify_events(keys, deletions=True, exact_match=False)
sub_conf.del_config()
notification_conf.del_config()
topic_conf.del_config()
- zones[0].delete_bucket(bucket_name)
+ master_zone.delete_bucket(bucket_name)
clean_rabbitmq(proc)
proc = init_rabbitmq()
if proc is None:
return SkipTest('end2end amqp tests require rabbitmq-server installed')
- zones, ps_zones = init_env()
+ master_zone, ps_zone = init_env()
bucket_name = gen_bucket_name()
topic_name = bucket_name+TOPIC_SUFFIX
exchange = 'ex1'
task, receiver = create_amqp_receiver_thread(exchange, topic_name)
task.start()
- topic_conf = PSTopic(ps_zones[0].conn, topic_name,
+ topic_conf = PSTopic(ps_zone.conn, topic_name,
endpoint='amqp://' + hostname,
endpoint_args='amqp-exchange=' + exchange + '&amqp-ack-level=none')
result, status = topic_conf.set_config()
parsed_result = json.loads(result)
topic_arn = parsed_result['arn']
# create bucket on the first of the rados zones
- bucket = zones[0].create_bucket(bucket_name)
+ bucket = master_zone.create_bucket(bucket_name)
# wait for sync
- zone_meta_checkpoint(ps_zones[0].zone)
+ zone_meta_checkpoint(ps_zone.zone)
# create s3 notification
notification_name = bucket_name + NOTIFICATION_SUFFIX
topic_conf_list = [{'Id': notification_name,
'TopicArn': topic_arn,
'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*']
}]
- s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
+ s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
_, status = s3_notification_conf.set_config()
assert_equal(status/100, 2)
# create objects in the bucket
key = bucket.new_key(str(i))
key.set_contents_from_string('bar')
# wait for sync
- zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
+ zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
# check amqp receiver
keys = list(bucket.list())
# TODO: use exact match
for key in bucket.list():
key.delete()
# wait for sync
- zone_meta_checkpoint(ps_zones[0].zone)
- zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
+ zone_meta_checkpoint(ps_zone.zone)
+ zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
# check amqp receiver
# TODO: use exact match
receiver.verify_s3_events(keys, deletions=True, exact_match=False)
stop_amqp_receiver(receiver, task)
s3_notification_conf.del_config()
topic_conf.del_config()
- zones[0].delete_bucket(bucket_name)
+ master_zone.delete_bucket(bucket_name)
clean_rabbitmq(proc)
def test_ps_delete_bucket():
""" test notification status upon bucket deletion """
- zones, ps_zones = init_env()
+ master_zone, ps_zone = init_env()
bucket_name = gen_bucket_name()
# create bucket on the first of the rados zones
- bucket = zones[0].create_bucket(bucket_name)
+ bucket = master_zone.create_bucket(bucket_name)
# wait for sync
- zone_meta_checkpoint(ps_zones[0].zone)
+ zone_meta_checkpoint(ps_zone.zone)
topic_name = bucket_name + TOPIC_SUFFIX
# create topic
topic_name = bucket_name + TOPIC_SUFFIX
- topic_conf = PSTopic(ps_zones[0].conn, topic_name)
+ topic_conf = PSTopic(ps_zone.conn, topic_name)
response, status = topic_conf.set_config()
assert_equal(status/100, 2)
parsed_result = json.loads(response)
'TopicArn': topic_arn,
'Events': ['s3:ObjectCreated:*']
}]
- s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
+ s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
response, status = s3_notification_conf.set_config()
assert_equal(status/100, 2)
# create non-s3 notification
- notification_conf = PSNotification(ps_zones[0].conn, bucket_name,
+ notification_conf = PSNotification(ps_zone.conn, bucket_name,
topic_name)
_, status = notification_conf.set_config()
assert_equal(status/100, 2)
key = bucket.new_key(str(i))
key.set_contents_from_string('bar')
# wait for bucket sync
- zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
+ zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
keys = list(bucket.list())
# delete objects from the bucket
for key in bucket.list():
key.delete()
# wait for bucket sync
- zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
+ zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
# delete the bucket
- zones[0].delete_bucket(bucket_name)
+ master_zone.delete_bucket(bucket_name)
# wait for meta sync
- zone_meta_checkpoint(ps_zones[0].zone)
+ zone_meta_checkpoint(ps_zone.zone)
# get the events from the auto-generated subscription
- sub_conf = PSSubscription(ps_zones[0].conn, notification_name,
+ sub_conf = PSSubscription(ps_zone.conn, notification_name,
topic_name)
result, _ = sub_conf.get_events()
records = json.loads(result)
def test_ps_missing_topic():
""" test creating a subscription when no topic info exists"""
- zones, ps_zones = init_env()
+ master_zone, ps_zone = init_env()
bucket_name = gen_bucket_name()
topic_name = bucket_name+TOPIC_SUFFIX
# create bucket on the first of the rados zones
- zones[0].create_bucket(bucket_name)
+ master_zone.create_bucket(bucket_name)
# wait for sync
- zone_meta_checkpoint(ps_zones[0].zone)
+ zone_meta_checkpoint(ps_zone.zone)
# create s3 notification
notification_name = bucket_name + NOTIFICATION_SUFFIX
topic_arn = 'arn:aws:sns:::' + topic_name
'TopicArn': topic_arn,
'Events': ['s3:ObjectCreated:*']
}]
- s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
+ s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
try:
s3_notification_conf.set_config()
except:
assert 'missing topic is expected'
# cleanup
- zones[0].delete_bucket(bucket_name)
+ master_zone.delete_bucket(bucket_name)
def test_ps_s3_topic_update():
rabbit_proc = init_rabbitmq()
if rabbit_proc is None:
return SkipTest('end2end amqp tests require rabbitmq-server installed')
- zones, ps_zones = init_env()
+ master_zone, ps_zone = init_env()
bucket_name = gen_bucket_name()
topic_name = bucket_name+TOPIC_SUFFIX
exchange = 'ex1'
amqp_task, receiver = create_amqp_receiver_thread(exchange, topic_name)
amqp_task.start()
- topic_conf = PSTopic(ps_zones[0].conn, topic_name,
+ topic_conf = PSTopic(ps_zone.conn, topic_name,
endpoint='amqp://' + hostname,
endpoint_args='amqp-exchange=' + exchange + '&amqp-ack-level=none')
result, status = topic_conf.set_config()
http_server = StreamingHTTPServer(hostname, port)
# create bucket on the first of the rados zones
- bucket = zones[0].create_bucket(bucket_name)
+ bucket = master_zone.create_bucket(bucket_name)
# wait for sync
- zone_meta_checkpoint(ps_zones[0].zone)
+ zone_meta_checkpoint(ps_zone.zone)
# create s3 notification
notification_name = bucket_name + NOTIFICATION_SUFFIX
topic_conf_list = [{'Id': notification_name,
'TopicArn': topic_arn,
'Events': ['s3:ObjectCreated:*']
}]
- s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
+ s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
_, status = s3_notification_conf.set_config()
assert_equal(status/100, 2)
# create objects in the bucket
key = bucket.new_key(str(i))
key.set_contents_from_string('bar')
# wait for sync
- zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
+ zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
keys = list(bucket.list())
# TODO: use exact match
receiver.verify_s3_events(keys, exact_match=False)
# update the same topic with new endpoint
- topic_conf = PSTopic(ps_zones[0].conn, topic_name,
+ topic_conf = PSTopic(ps_zone.conn, topic_name,
endpoint='http://'+ hostname + ':' + str(port))
_, status = topic_conf.set_config()
assert_equal(status/100, 2)
key = bucket.new_key(str(i+100))
key.set_contents_from_string('bar')
# wait for sync
- zone_meta_checkpoint(ps_zones[0].zone)
- zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
+ zone_meta_checkpoint(ps_zone.zone)
+ zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
keys = list(bucket.list())
# verify that notifications are still sent to amqp
'TopicArn': topic_arn,
'Events': ['s3:ObjectCreated:*']
}]
- s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
+ s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
_, status = s3_notification_conf.set_config()
assert_equal(status/100, 2)
key = bucket.new_key(str(i+200))
key.set_contents_from_string('bar')
# wait for sync
- zone_meta_checkpoint(ps_zones[0].zone)
- zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
+ zone_meta_checkpoint(ps_zone.zone)
+ zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
keys = list(bucket.list())
# check that updates switched to http
key.delete()
s3_notification_conf.del_config()
topic_conf.del_config()
- zones[0].delete_bucket(bucket_name)
+ master_zone.delete_bucket(bucket_name)
http_server.close()
clean_rabbitmq(rabbit_proc)
if rabbit_proc is None:
return SkipTest('end2end amqp tests require rabbitmq-server installed')
- zones, ps_zones = init_env()
+ master_zone, ps_zone = init_env()
bucket_name = gen_bucket_name()
topic_name1 = bucket_name+'amqp'+TOPIC_SUFFIX
topic_name2 = bucket_name+'http'+TOPIC_SUFFIX
# start an http server in a separate thread
http_server = StreamingHTTPServer(hostname, http_port)
- topic_conf1 = PSTopic(ps_zones[0].conn, topic_name1,
+ topic_conf1 = PSTopic(ps_zone.conn, topic_name1,
endpoint='amqp://' + hostname,
endpoint_args='amqp-exchange=' + exchange + '&amqp-ack-level=none')
result, status = topic_conf1.set_config()
parsed_result = json.loads(result)
topic_arn1 = parsed_result['arn']
assert_equal(status/100, 2)
- topic_conf2 = PSTopic(ps_zones[0].conn, topic_name2,
+ topic_conf2 = PSTopic(ps_zone.conn, topic_name2,
endpoint='http://'+hostname+':'+str(http_port))
result, status = topic_conf2.set_config()
parsed_result = json.loads(result)
assert_equal(status/100, 2)
# create bucket on the first of the rados zones
- bucket = zones[0].create_bucket(bucket_name)
+ bucket = master_zone.create_bucket(bucket_name)
# wait for sync
- zone_meta_checkpoint(ps_zones[0].zone)
+ zone_meta_checkpoint(ps_zone.zone)
# create s3 notification with topic1
notification_name = bucket_name + NOTIFICATION_SUFFIX
topic_conf_list = [{'Id': notification_name,
'TopicArn': topic_arn1,
'Events': ['s3:ObjectCreated:*']
}]
- s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
+ s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
_, status = s3_notification_conf.set_config()
assert_equal(status/100, 2)
# create objects in the bucket
key = bucket.new_key(str(i))
key.set_contents_from_string('bar')
# wait for sync
- zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
+ zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
keys = list(bucket.list())
# TODO: use exact match
'TopicArn': topic_arn2,
'Events': ['s3:ObjectCreated:*']
}]
- s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
+ s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
_, status = s3_notification_conf.set_config()
assert_equal(status/100, 2)
key = bucket.new_key(str(i+100))
key.set_contents_from_string('bar')
# wait for sync
- zone_meta_checkpoint(ps_zones[0].zone)
- zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
+ zone_meta_checkpoint(ps_zone.zone)
+ zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
keys = list(bucket.list())
# check that updates switched to http
s3_notification_conf.del_config()
topic_conf1.del_config()
topic_conf2.del_config()
- zones[0].delete_bucket(bucket_name)
+ master_zone.delete_bucket(bucket_name)
http_server.close()
clean_rabbitmq(rabbit_proc)
if rabbit_proc is None:
return SkipTest('end2end amqp tests require rabbitmq-server installed')
- zones, ps_zones = init_env()
+ master_zone, ps_zone = init_env()
bucket_name = gen_bucket_name()
topic_name1 = bucket_name+'amqp'+TOPIC_SUFFIX
topic_name2 = bucket_name+'http'+TOPIC_SUFFIX
# start an http server in a separate thread
http_server = StreamingHTTPServer(hostname, http_port)
- topic_conf1 = PSTopic(ps_zones[0].conn, topic_name1,
+ topic_conf1 = PSTopic(ps_zone.conn, topic_name1,
endpoint='amqp://' + hostname,
endpoint_args='amqp-exchange=' + exchange + '&amqp-ack-level=none')
result, status = topic_conf1.set_config()
parsed_result = json.loads(result)
topic_arn1 = parsed_result['arn']
assert_equal(status/100, 2)
- topic_conf2 = PSTopic(ps_zones[0].conn, topic_name2,
+ topic_conf2 = PSTopic(ps_zone.conn, topic_name2,
endpoint='http://'+hostname+':'+str(http_port))
result, status = topic_conf2.set_config()
parsed_result = json.loads(result)
assert_equal(status/100, 2)
# create bucket on the first of the rados zones
- bucket = zones[0].create_bucket(bucket_name)
+ bucket = master_zone.create_bucket(bucket_name)
# wait for sync
- zone_meta_checkpoint(ps_zones[0].zone)
+ zone_meta_checkpoint(ps_zone.zone)
# create s3 notification
notification_name1 = bucket_name + NOTIFICATION_SUFFIX + '_1'
notification_name2 = bucket_name + NOTIFICATION_SUFFIX + '_2'
'TopicArn': topic_arn2,
'Events': ['s3:ObjectCreated:*']
}]
- s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list)
+ s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
_, status = s3_notification_conf.set_config()
assert_equal(status/100, 2)
result, _ = s3_notification_conf.get_config()
assert_equal(result['TopicConfigurations'][1]['Id'], notification_name2)
# get auto-generated subscriptions
- sub_conf1 = PSSubscription(ps_zones[0].conn, notification_name1,
+ sub_conf1 = PSSubscription(ps_zone.conn, notification_name1,
topic_name1)
_, status = sub_conf1.get_config()
assert_equal(status/100, 2)
- sub_conf2 = PSSubscription(ps_zones[0].conn, notification_name2,
+ sub_conf2 = PSSubscription(ps_zone.conn, notification_name2,
topic_name2)
_, status = sub_conf2.get_config()
assert_equal(status/100, 2)
key = bucket.new_key(str(i))
key.set_contents_from_string('bar')
# wait for sync
- zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name)
+ zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
# get the events from both of the subscription
result, _ = sub_conf1.get_events()
# delete objects from the bucket
for key in bucket.list():
key.delete()
- zones[0].delete_bucket(bucket_name)
+ master_zone.delete_bucket(bucket_name)
http_server.close()
clean_rabbitmq(rabbit_proc)