log.info('waited for %ds for queue %s to drain', time_diff, topic_name)
-@attr('basic_test')
-def test_ps_s3_persistent_topic_stats():
- """ test persistent topic stats """
- conn = connection()
+@attr('kafka_test')
+def persistent_topic_stats(conn, endpoint_type):
zonegroup = get_config_zonegroup()
- # create random port for the http server
- host = get_ip()
- port = random.randint(10000, 20000)
-
# create bucket
bucket_name = gen_bucket_name()
bucket = conn.create_bucket(bucket_name)
topic_name = bucket_name + TOPIC_SUFFIX
+ host = get_ip()
+ task = None
+ port = None
+ if endpoint_type == 'http':
+ # create random port for the http server
+ port = random.randint(10000, 20000)
+ # start an http server in a separate thread
+ receiver = HTTPServerWithEvents((host, port))
+ endpoint_address = 'http://'+host+':'+str(port)
+ endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'+ \
+ '&retry_sleep_duration=1'
+ elif endpoint_type == 'amqp':
+ # start amqp receiver
+ exchange = 'ex1'
+ task, receiver = create_amqp_receiver_thread(exchange, topic_name)
+ task.start()
+ endpoint_address = 'amqp://' + host
+ endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange='+exchange+'&amqp-ack-level=broker&persistent=true'+ \
+ '&retry_sleep_duration=1'
+ elif endpoint_type == 'kafka':
+ # start kafka receiver
+ task, receiver = create_kafka_receiver_thread(topic_name)
+ task.start()
+ endpoint_address = 'kafka://' + host
+ endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&persistent=true'+ \
+ '&retry_sleep_duration=1'
+ else:
+ return SkipTest('Unknown endpoint type: ' + endpoint_type)
+
# create s3 topic
- endpoint_address = 'http://'+host+':'+str(port)
- endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'+ \
- '&retry_sleep_duration=1'
+ endpoint_address = 'kafka://' + host + ':1234' # wrong port
+ endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&persistent=true'+ \
+ '&retry_sleep_duration=1'
topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
topic_arn = topic_conf.set_config()
# create s3 notification
# topic stats
get_stats_persistent_topic(topic_name, 2 * number_of_objects)
- # start an http server in a separate thread
- http_server = HTTPServerWithEvents((host, port))
+ # change the endpoint port
+ endpoint_address = 'kafka://' + host
+ endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&persistent=true'+ \
+ '&retry_sleep_duration=1'
+ topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
+ topic_arn = topic_conf.set_config()
wait_for_queue_to_drain(topic_name, http_port=port)
topic_conf.del_config()
# delete the bucket
conn.delete_bucket(bucket_name)
- http_server.close()
+ receiver.close(task)
-@attr('basic_test')
+
+@attr('http_test')
+def persistent_topic_stats_http():
+ """ test persistent topic stats, http endpoint """
+ conn = connection()
+ persistent_topic_stats(conn, 'http')
+
+
+@attr('kafka_test')
+def persistent_topic_stats_kafka():
+ """ test persistent topic stats, kafka endpoint """
+ conn = connection()
+ persistent_topic_stats(conn, 'kafka')
+
+
+@attr('kafka_test')
def test_persistent_topic_dump():
""" test persistent topic dump """
conn = connection()
zonegroup = get_config_zonegroup()
- # create random port for the http server
- host = get_ip()
- port = random.randint(10000, 20000)
-
# create bucket
bucket_name = gen_bucket_name()
bucket = conn.create_bucket(bucket_name)
topic_name = bucket_name + TOPIC_SUFFIX
+ # start kafka receiver
+ host = get_ip()
+ task, receiver = create_kafka_receiver_thread(topic_name)
+ task.start()
+
+
# create s3 topic
- endpoint_address = 'http://'+host+':'+str(port)
- endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'+ \
- '&retry_sleep_duration=1'
+ endpoint_address = 'kafka://WrongHost' # wrong port
+ endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&persistent=true'+ \
+ '&retry_sleep_duration=1'
topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
topic_arn = topic_conf.set_config()
# create s3 notification
parsed_result = json.loads(result[0])
assert_equal(len(parsed_result), 2*number_of_objects)
- # start an http server in a separate thread
- http_server = HTTPServerWithEvents((host, port))
+ # change the endpoint port
+ endpoint_address = 'kafka://' + host
+ endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&persistent=true'+ \
+ '&retry_sleep_duration=1'
+ topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
+ topic_arn = topic_conf.set_config()
- wait_for_queue_to_drain(topic_name, http_port=port)
+ wait_for_queue_to_drain(topic_name,)
result = admin(['topic', 'dump', '--topic', topic_name], get_config_cluster())
assert_equal(result[1], 0)
topic_conf.del_config()
# delete the bucket
conn.delete_bucket(bucket_name)
- http_server.close()
+ receiver.close(task)
def ps_s3_persistent_topic_configs(persistency_time, config_dict):
http_server.close()
-@attr('http_test')
-def test_ps_s3_persistent_multiple_endpoints():
- """ test pushing persistent notification when one of the endpoints has error """
- conn = connection()
+def persistent_topic_multiple_endpoints(conn, endpoint_type):
zonegroup = get_config_zonegroup()
- # create random port for the http server
- host = get_ip()
- port = random.randint(10000, 20000)
- # start an http server in a separate thread
- number_of_objects = 10
- http_server = HTTPServerWithEvents((host, port))
-
# create bucket
bucket_name = gen_bucket_name()
bucket = conn.create_bucket(bucket_name)
topic_name = bucket_name + TOPIC_SUFFIX
+ topic_name_1 = topic_name+'_1'
+
+ host = get_ip()
+ task = None
+ port = None
+ if endpoint_type == 'http':
+ # create random port for the http server
+ port = random.randint(10000, 20000)
+ # start an http server in a separate thread
+ receiver = HTTPServerWithEvents((host, port))
+ endpoint_address = 'http://'+host+':'+str(port)
+ endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'+ \
+ '&retry_sleep_duration=1'
+ elif endpoint_type == 'amqp':
+ # start amqp receiver
+ exchange = 'ex1'
+ task, receiver = create_amqp_receiver_thread(exchange, topic_name_1)
+ task.start()
+ endpoint_address = 'amqp://' + host
+ endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange='+exchange+'&amqp-ack-level=broker&persistent=true'+ \
+ '&retry_sleep_duration=1'
+ elif endpoint_type == 'kafka':
+ # start kafka receiver
+ task, receiver = create_kafka_receiver_thread(topic_name_1)
+ task.start()
+ endpoint_address = 'kafka://' + host
+ endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&persistent=true'+ \
+ '&retry_sleep_duration=1'
+ else:
+ return SkipTest('Unknown endpoint type: ' + endpoint_type)
# create two s3 topics
- endpoint_address = 'http://'+host+':'+str(port)
- endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'+ \
- '&retry_sleep_duration=1'
- topic_conf1 = PSTopicS3(conn, topic_name+'_1', zonegroup, endpoint_args=endpoint_args)
+ topic_conf1 = PSTopicS3(conn, topic_name_1, zonegroup, endpoint_args=endpoint_args)
topic_arn1 = topic_conf1.set_config()
endpoint_address = 'http://kaboom:9999'
endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'+ \
- '&retry_sleep_duration=1'
+ '&retry_sleep_duration=1'
topic_conf2 = PSTopicS3(conn, topic_name+'_2', zonegroup, endpoint_args=endpoint_args)
topic_arn2 = topic_conf2.set_config()
client_threads = []
start_time = time.time()
+ number_of_objects = 10
for i in range(number_of_objects):
key = bucket.new_key(str(i))
content = str(os.urandom(1024*1024))
keys = list(bucket.list())
- wait_for_queue_to_drain(topic_name+'_1')
-
- http_server.verify_s3_events(keys, exact_match=True, deletions=False)
+ wait_for_queue_to_drain(topic_name_1, http_port=port)
+ receiver.verify_s3_events(keys, exact_match=True, deletions=False)
# delete objects from the bucket
client_threads = []
client_threads.append(thr)
[thr.join() for thr in client_threads]
- wait_for_queue_to_drain(topic_name+'_1')
-
- http_server.verify_s3_events(keys, exact_match=True, deletions=True)
+ wait_for_queue_to_drain(topic_name_1, http_port=port)
+ receiver.verify_s3_events(keys, exact_match=True, deletions=True)
# cleanup
s3_notification_conf1.del_config()
s3_notification_conf2.del_config()
topic_conf2.del_config()
conn.delete_bucket(bucket_name)
- http_server.close()
+ receiver.close(task)
+
+
+@attr('http_test')
+def test_persistent_multiple_endpoints_http():
+ """ test pushing persistent notification when one of the endpoints has error, http endpoint """
+ conn = connection()
+ persistent_topic_multiple_endpoints(conn, 'http')
+
+
+@attr('kafka_test')
+def test_persistent_multiple_endpoints_kafka():
+ """ test pushing persistent notification when one of the endpoints has error, kafka endpoint """
+ conn = connection()
+ persistent_topic_multiple_endpoints(conn, 'kafka')
+
def persistent_notification(endpoint_type, conn, account=None):
""" test pushing persistent notification """
http_server.close()
-@attr('data_path_v2_test')
-def test_persistent_ps_s3_data_path_v2_migration():
+def persistent_data_path_v2_migration(conn, endpoint_type):
""" test data path v2 persistent migration """
if get_config_cluster() == 'noname':
return SkipTest('realm is needed for migration test')
- conn = connection()
zonegroup = get_config_zonegroup()
- # create random port for the http server
- host = get_ip()
- http_port = random.randint(10000, 20000)
-
# disable v2 notification
zonegroup_modify_feature(enable=False, feature_name=zonegroup_feature_notification_v2)
bucket = conn.create_bucket(bucket_name)
topic_name = bucket_name + TOPIC_SUFFIX
- # create s3 topic
- endpoint_address = 'http://'+host+':'+str(http_port)
- endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'+ \
- '&retry_sleep_duration=1'
+ host = get_ip()
+ task = None
+ port = None
+ if endpoint_type == 'http':
+ # create random port for the http server
+ port = random.randint(10000, 20000)
+ # start an http server in a separate thread
+ receiver = HTTPServerWithEvents((host, port))
+ endpoint_address = 'http://'+host+':'+str(port)
+ endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'+ \
+ '&retry_sleep_duration=1'
+ elif endpoint_type == 'amqp':
+ # start amqp receiver
+ exchange = 'ex1'
+ task, receiver = create_amqp_receiver_thread(exchange, topic_name)
+ task.start()
+ endpoint_address = 'amqp://' + host
+ endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange='+exchange+'&amqp-ack-level=broker&persistent=true'+ \
+ '&retry_sleep_duration=1'
+ elif endpoint_type == 'kafka':
+ # start kafka receiver
+ task, receiver = create_kafka_receiver_thread(topic_name)
+ task.start()
+ endpoint_address = 'kafka://' + host
+ endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&persistent=true'+ \
+ '&retry_sleep_duration=1'
+ else:
+ return SkipTest('Unknown endpoint type: ' + endpoint_type)
+
topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
topic_arn = topic_conf.set_config()
# create s3 notification
# topic stats
get_stats_persistent_topic(topic_name, 2 * number_of_objects)
- # start an http server in a separate thread
- http_server = HTTPServerWithEvents((host, http_port))
-
- wait_for_queue_to_drain(topic_name, http_port=http_port)
+ wait_for_queue_to_drain(topic_name)
# verify events
keys = list(bucket.list())
# exact match is false because the notifications are persistent.
- http_server.verify_s3_events(keys, exact_match=False)
+ receiver.verify_s3_events(keys, exact_match=False)
except Exception as e:
assert False, str(e)
[thr.join() for thr in client_threads]
# delete the bucket
conn.delete_bucket(bucket_name)
- if http_server:
- http_server.close()
+ receiver.close(task)
+
+
+@attr('data_path_v2_test')
+def persistent_data_path_v2_migration_http():
+ """ test data path v2 persistent migration, http endpoint """
+ conn = connection()
+ persistent_data_path_v2_migration(conn, 'http')
+
+
+@attr('data_path_v2_kafka_test')
+def persistent_data_path_v2_migration_kafka():
+ """ test data path v2 persistent migration, kafka endpoint """
+ conn = connection()
+ persistent_data_path_v2_migration(conn, 'kafka')
@attr('data_path_v2_test')