From: Hoai-Thu Vuong Date: Tue, 9 Jul 2024 16:56:51 +0000 (+0700) Subject: rgw: allow bucket notification send message to kafka with multiple brokers X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=2f79424f3292d144b4c2512531cec6e8d5d373f6;p=ceph.git rgw: allow bucket notification send message to kafka with multiple brokers - add new parameter to define list of broker - update document - change nose to nose-py3 - add test case for multiple brokers (happy case) Signed-off-by: Hoai-Thu Vuong (cherry picked from commit 1a34cd1be9f2c10a8262372e0a5bab396655bdab) Conflicts: doc/radosgw/notifications.rst qa/tasks/notification_tests.py src/rgw/driver/rados/rgw_pubsub_push.cc src/rgw/rgw_kafka.cc src/rgw/rgw_kafka.h src/test/rgw/bucket_notification/test_bn.py - update doc with new params - add only kafka-failover - update param in connect function of rgw_kafka - resolve conflict by keep two function: - test_ps_s3_notification_push_amqp_on_master - notification_push --- diff --git a/doc/radosgw/notifications.rst b/doc/radosgw/notifications.rst index e13d1ced4473..5380150baa98 100644 --- a/doc/radosgw/notifications.rst +++ b/doc/radosgw/notifications.rst @@ -164,6 +164,7 @@ updating, use the name of an existing topic and different endpoint values). [&Attributes.entry.13.key=max_retries&Attributes.entry.13.value=] [&Attributes.entry.14.key=retry_sleep_duration&Attributes.entry.14.value=] [&Attributes.entry.15.key=Policy&Attributes.entry.15.value=] + [&Attributes.entry.18.key=kafka-brokers&Attributes.entry.18.value=] Request parameters: @@ -240,6 +241,8 @@ Request parameters: - "broker": Messages are considered "delivered" if acked by the broker. (This is the default.) + - kafka-brokers: A command-separated list of host:port of kafka brokers. These brokers (may contain a broker which is defined in kafka uri) will be added to kafka uri to support sending notifcations to a kafka cluster. + .. note:: - The key-value pair of a specific parameter need not reside in the same diff --git a/qa/suites/rgw/notifications/tasks/kafka_failover/+ b/qa/suites/rgw/notifications/tasks/kafka_failover/+ new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/qa/suites/rgw/notifications/tasks/kafka_failover/0-install.yaml b/qa/suites/rgw/notifications/tasks/kafka_failover/0-install.yaml new file mode 100644 index 000000000000..5c83d5c0d23f --- /dev/null +++ b/qa/suites/rgw/notifications/tasks/kafka_failover/0-install.yaml @@ -0,0 +1,20 @@ +tasks: +- install: +- ceph: +- openssl_keys: +- rgw: + client.0: + +overrides: + install: + ceph: + extra_system_packages: + rpm: + - java + deb: + - default-jre + ceph: + conf: + global: + osd_min_pg_log_entries: 10 + osd_max_pg_log_entries: 10 diff --git a/qa/suites/rgw/notifications/tasks/kafka_failover/supported-distros b/qa/suites/rgw/notifications/tasks/kafka_failover/supported-distros new file mode 120000 index 000000000000..46280a42a96d --- /dev/null +++ b/qa/suites/rgw/notifications/tasks/kafka_failover/supported-distros @@ -0,0 +1 @@ +../../.qa/distros/supported-random-distro$/ \ No newline at end of file diff --git a/qa/suites/rgw/notifications/tasks/kafka_failover/test_kafka.yaml b/qa/suites/rgw/notifications/tasks/kafka_failover/test_kafka.yaml new file mode 100644 index 000000000000..01d6fc637deb --- /dev/null +++ b/qa/suites/rgw/notifications/tasks/kafka_failover/test_kafka.yaml @@ -0,0 +1,8 @@ +tasks: +- kafka-failover: + client.0: + kafka_version: 3.8.1 +- notification-tests: + client.0: + extra_attr: ["kafka_failover"] + rgw_server: client.0 diff --git a/qa/tasks/kafka_failover.py b/qa/tasks/kafka_failover.py new file mode 100644 index 000000000000..3ca60ab84fcf --- /dev/null +++ b/qa/tasks/kafka_failover.py @@ -0,0 +1,244 @@ +""" +Deploy and configure Kafka for Teuthology +""" +import contextlib +import logging +import time +import os + +from teuthology import misc as teuthology +from teuthology import contextutil +from teuthology.orchestra import run + +log = logging.getLogger(__name__) + +def get_kafka_version(config): + for client, client_config in config.items(): + if 'kafka_version' in client_config: + kafka_version = client_config.get('kafka_version') + return kafka_version + +kafka_prefix = 'kafka_2.13-' + +def get_kafka_dir(ctx, config): + kafka_version = get_kafka_version(config) + current_version = kafka_prefix + kafka_version + return '{tdir}/{ver}'.format(tdir=teuthology.get_testdir(ctx),ver=current_version) + + +@contextlib.contextmanager +def install_kafka(ctx, config): + """ + Downloading the kafka tar file. + """ + assert isinstance(config, dict) + log.info('Installing Kafka...') + + # programmatically find a nearby mirror so as not to hammer archive.apache.org + apache_mirror_cmd="curl 'https://www.apache.org/dyn/closer.cgi' 2>/dev/null | " \ + "grep -o '[^<]*' | sed 's/<[^>]*>//g' | head -n 1" + log.info("determining apache mirror by running: " + apache_mirror_cmd) + apache_mirror_url_front = os.popen(apache_mirror_cmd).read().rstrip() # note: includes trailing slash (/) + log.info("chosen apache mirror is " + apache_mirror_url_front) + + for (client, _) in config.items(): + (remote,) = ctx.cluster.only(client).remotes.keys() + test_dir=teuthology.get_testdir(ctx) + current_version = get_kafka_version(config) + + kafka_file = kafka_prefix + current_version + '.tgz' + + link1 = '{apache_mirror_url_front}/kafka/'.format(apache_mirror_url_front=apache_mirror_url_front) + \ + current_version + '/' + kafka_file + ctx.cluster.only(client).run( + args=['cd', '{tdir}'.format(tdir=test_dir), run.Raw('&&'), 'wget', link1], + ) + + ctx.cluster.only(client).run( + args=['cd', '{tdir}'.format(tdir=test_dir), run.Raw('&&'), 'tar', '-xvzf', kafka_file], + ) + + kafka_dir = get_kafka_dir(ctx, config) + # create config for second broker + second_broker_config_name = "server2.properties" + second_broker_data = "{tdir}/data/broker02".format(tdir=kafka_dir) + second_broker_data_logs_escaped = "{}/logs".format(second_broker_data).replace("/", "\/") + + ctx.cluster.only(client).run( + args=['cd', '{tdir}'.format(tdir=kafka_dir), run.Raw('&&'), + 'cp', '{tdir}/config/server.properties'.format(tdir=kafka_dir), '{tdir}/config/{second_broker_config_name}'.format(tdir=kafka_dir, second_broker_config_name=second_broker_config_name), run.Raw('&&'), + 'mkdir', '-p', '{tdir}/data'.format(tdir=kafka_dir) + ], + ) + + # edit config + ctx.cluster.only(client).run( + args=['sed', '-i', 's/broker.id=0/broker.id=1/g', '{tdir}/config/{second_broker_config_name}'.format(tdir=kafka_dir, second_broker_config_name=second_broker_config_name), run.Raw('&&'), + 'sed', '-i', 's/#listeners=PLAINTEXT:\/\/:9092/listeners=PLAINTEXT:\/\/localhost:19092/g', '{tdir}/config/{second_broker_config_name}'.format(tdir=kafka_dir, second_broker_config_name=second_broker_config_name), run.Raw('&&'), + 'sed', '-i', 's/#advertised.listeners=PLAINTEXT:\/\/your.host.name:9092/advertised.listeners=PLAINTEXT:\/\/localhost:19092/g', '{tdir}/config/{second_broker_config_name}'.format(tdir=kafka_dir, second_broker_config_name=second_broker_config_name), run.Raw('&&'), + 'sed', '-i', 's/log.dirs=\/tmp\/kafka-logs/log.dirs={}/g'.format(second_broker_data_logs_escaped), '{tdir}/config/{second_broker_config_name}'.format(tdir=kafka_dir, second_broker_config_name=second_broker_config_name), run.Raw('&&'), + 'cat', '{tdir}/config/{second_broker_config_name}'.format(tdir=kafka_dir, second_broker_config_name=second_broker_config_name) + ] + ) + + try: + yield + finally: + log.info('Removing packaged dependencies of Kafka...') + test_dir=get_kafka_dir(ctx, config) + current_version = get_kafka_version(config) + for (client,_) in config.items(): + ctx.cluster.only(client).run( + args=['rm', '-rf', '{tdir}/logs'.format(tdir=test_dir)], + ) + + ctx.cluster.only(client).run( + args=['rm', '-rf', test_dir], + ) + + ctx.cluster.only(client).run( + args=['rm', '-rf', '{tdir}/{doc}'.format(tdir=teuthology.get_testdir(ctx),doc=kafka_file)], + ) + + +@contextlib.contextmanager +def run_kafka(ctx,config): + """ + This includes two parts: + 1. Starting Zookeeper service + 2. Starting Kafka service + """ + assert isinstance(config, dict) + log.info('Bringing up Zookeeper and Kafka services...') + for (client,_) in config.items(): + (remote,) = ctx.cluster.only(client).remotes.keys() + kafka_dir = get_kafka_dir(ctx, config) + + second_broker_data = "{tdir}/data/broker02".format(tdir=kafka_dir) + second_broker_java_log_dir = "{}/java_logs".format(second_broker_data) + + ctx.cluster.only(client).run( + args=['cd', '{tdir}/bin'.format(tdir=kafka_dir), run.Raw('&&'), + './zookeeper-server-start.sh', + '{tir}/config/zookeeper.properties'.format(tir=kafka_dir), + run.Raw('&'), 'exit' + ], + ) + + ctx.cluster.only(client).run( + args=['cd', '{tdir}/bin'.format(tdir=kafka_dir), run.Raw('&&'), + './kafka-server-start.sh', + '{tir}/config/server.properties'.format(tir=get_kafka_dir(ctx, config)), + run.Raw('&'), 'exit' + ], + ) + + ctx.cluster.only(client).run( + args=['cd', '{tdir}/bin'.format(tdir=kafka_dir), run.Raw('&&'), + run.Raw('LOG_DIR={second_broker_java_log_dir}'.format(second_broker_java_log_dir=second_broker_java_log_dir)), + './kafka-server-start.sh', '{tdir}/config/server2.properties'.format(tdir=kafka_dir), + run.Raw('&'), 'exit' + ], + ) + + try: + yield + finally: + log.info('Stopping Zookeeper and Kafka Services...') + + for (client, _) in config.items(): + (remote,) = ctx.cluster.only(client).remotes.keys() + + ctx.cluster.only(client).run( + args=['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'), + './kafka-server-stop.sh', + '{tir}/config/kafka.properties'.format(tir=get_kafka_dir(ctx, config)), + ], + ) + + time.sleep(5) + + ctx.cluster.only(client).run( + args=['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'), + './zookeeper-server-stop.sh', + '{tir}/config/zookeeper.properties'.format(tir=get_kafka_dir(ctx, config)), + ], + ) + + time.sleep(5) + + ctx.cluster.only(client).run(args=['killall', '-9', 'java']) + + +@contextlib.contextmanager +def run_admin_cmds(ctx,config): + """ + Running Kafka Admin commands in order to check the working of producer anf consumer and creation of topic. + """ + assert isinstance(config, dict) + log.info('Checking kafka server through producer/consumer commands...') + for (client,_) in config.items(): + (remote,) = ctx.cluster.only(client).remotes.keys() + + ctx.cluster.only(client).run( + args=[ + 'cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'), + './kafka-topics.sh', '--create', '--topic', 'quickstart-events', + '--bootstrap-server', 'localhost:9092' + ], + ) + + ctx.cluster.only(client).run( + args=[ + 'cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'), + 'echo', "First", run.Raw('|'), + './kafka-console-producer.sh', '--topic', 'quickstart-events', + '--bootstrap-server', 'localhost:9092' + ], + ) + + ctx.cluster.only(client).run( + args=[ + 'cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'), + './kafka-console-consumer.sh', '--topic', 'quickstart-events', + '--from-beginning', + '--bootstrap-server', 'localhost:9092', + run.Raw('&'), 'exit' + ], + ) + + try: + yield + finally: + pass + + +@contextlib.contextmanager +def task(ctx,config): + """ + Following is the way how to run kafka:: + tasks: + - kafka: + client.0: + kafka_version: 2.6.0 + """ + assert config is None or isinstance(config, list) \ + or isinstance(config, dict), \ + "task kafka only supports a list or dictionary for configuration" + + all_clients = ['client.{id}'.format(id=id_) + for id_ in teuthology.all_roles_of_type(ctx.cluster, 'client')] + if config is None: + config = all_clients + if isinstance(config, list): + config = dict.fromkeys(config) + + log.debug('Kafka config is %s', config) + + with contextutil.nested( + lambda: install_kafka(ctx=ctx, config=config), + lambda: run_kafka(ctx=ctx, config=config), + lambda: run_admin_cmds(ctx=ctx, config=config), + ): + yield + diff --git a/qa/tasks/notification_tests.py b/qa/tasks/notification_tests.py index 7a3a401ab066..c6ecb8b41b78 100644 --- a/qa/tasks/notification_tests.py +++ b/qa/tasks/notification_tests.py @@ -220,7 +220,7 @@ def run_tests(ctx, config): for client, client_config in config.items(): (remote,) = ctx.cluster.only(client).remotes.keys() - attr = ["!kafka_test", "!amqp_test", "!amqp_ssl_test", "!kafka_ssl_test", "!modification_required", "!manual_test"] + attr = ["!kafka_test", "!kafka_failover", "!amqp_test", "!amqp_ssl_test", "!kafka_ssl_test", "!modification_required", "!manual_test"] if 'extra_attr' in client_config: attr = client_config.get('extra_attr') diff --git a/src/rgw/driver/rados/rgw_pubsub_push.cc b/src/rgw/driver/rados/rgw_pubsub_push.cc index 05dc9e65d0ea..baec6fe173cb 100644 --- a/src/rgw/driver/rados/rgw_pubsub_push.cc +++ b/src/rgw/driver/rados/rgw_pubsub_push.cc @@ -303,11 +303,12 @@ public: cct(_cct), topic(_topic), ack_level(get_ack_level(args)) { - if (!kafka::connect(conn_name, _endpoint, get_bool(args, "use-ssl", false), get_bool(args, "verify-ssl", true), - args.get_optional("ca-location"), args.get_optional("mechanism"))) { + if (!kafka::connect(conn_name, _endpoint, get_bool(args, "use-ssl", false), get_bool(args, "verify-ssl", true), + args.get_optional("ca-location"), args.get_optional("mechanism"), + args.get_optional("kafka-brokers"))) { throw configuration_error("Kafka: failed to create connection to: " + _endpoint); - } - } + } + } int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_event& event, optional_yield y) override { if (ack_level == ack_level_t::None) { diff --git a/src/rgw/rgw_kafka.cc b/src/rgw/rgw_kafka.cc index 290fde360ff0..1356eeef75ff 100644 --- a/src/rgw/rgw_kafka.cc +++ b/src/rgw/rgw_kafka.cc @@ -13,6 +13,8 @@ #include #include #include +#include +#include #include #include "common/dout.h" @@ -542,12 +544,13 @@ public: } // connect to a broker, or reuse an existing connection if already connected - bool connect(std::string& broker, - const std::string& url, - bool use_ssl, - bool verify_ssl, - boost::optional ca_location, - boost::optional mechanism) { + bool connect(std::string& broker_list, + const std::string& url, + bool use_ssl, + bool verify_ssl, + boost::optional ca_location, + boost::optional mechanism, + boost::optional brokers) { if (stopped) { ldout(cct, 1) << "Kafka connect: manager is stopped" << dendl; return false; @@ -555,7 +558,8 @@ public: std::string user; std::string password; - if (!parse_url_authority(url, broker, user, password)) { + + if (!parse_url_authority(url, broker_list, user, password)) { // TODO: increment counter ldout(cct, 1) << "Kafka connect: URL parsing failed" << dendl; return false; @@ -569,8 +573,13 @@ public: return false; } + if (brokers.has_value()) { + broker_list.append(","); + broker_list.append(brokers.get()); + } + std::lock_guard lock(connections_lock); - const auto it = connections.find(broker); + const auto it = connections.find(broker_list); // note that ssl vs. non-ssl connection to the same host are two separate conenctions if (it != connections.end()) { // connection found - return even if non-ok @@ -589,7 +598,7 @@ public: // in such a case the creation will be retried in the main thread ++connection_count; ldout(cct, 10) << "Kafka connect: new connection is created. Total connections: " << connection_count << dendl; - auto conn = connections.emplace(broker, std::make_unique(cct, broker, use_ssl, verify_ssl, ca_location, user, password, mechanism)).first->second.get(); + auto conn = connections.emplace(broker_list, std::make_unique(cct, broker_list, use_ssl, verify_ssl, ca_location, user, password, mechanism)).first->second.get(); if (!new_producer(conn)) { ldout(cct, 10) << "Kafka connect: new connection is created. But producer creation failed. will retry" << dendl; } @@ -682,11 +691,16 @@ void shutdown() { s_manager = nullptr; } -bool connect(std::string& broker, const std::string& url, bool use_ssl, bool verify_ssl, - boost::optional ca_location, - boost::optional mechanism) { +bool connect(std::string& broker_list, + const std::string& url, + bool use_ssl, + bool verify_ssl, + boost::optional ca_location, + boost::optional mechanism, + boost::optional brokers) { if (!s_manager) return false; - return s_manager->connect(broker, url, use_ssl, verify_ssl, ca_location, mechanism); + return s_manager->connect(broker_list, url, use_ssl, verify_ssl, ca_location, + mechanism, brokers); } int publish(const std::string& conn_name, diff --git a/src/rgw/rgw_kafka.h b/src/rgw/rgw_kafka.h index 813fda32969b..3de026a0fb68 100644 --- a/src/rgw/rgw_kafka.h +++ b/src/rgw/rgw_kafka.h @@ -22,7 +22,13 @@ bool init(CephContext* cct); void shutdown(); // connect to a kafka endpoint -bool connect(std::string& broker, const std::string& url, bool use_ssl, bool verify_ssl, boost::optional ca_location, boost::optional mechanism); +bool connect(std::string& broker_list, + const std::string& url, + bool use_ssl, + bool verify_ssl, + boost::optional ca_location, + boost::optional mechanism, + boost::optional brokers); // publish a message over a connection that was already created int publish(const std::string& conn_name, diff --git a/src/test/rgw/bucket_notification/requirements.txt b/src/test/rgw/bucket_notification/requirements.txt index a3cff2bedabb..bb74eceedc39 100644 --- a/src/test/rgw/bucket_notification/requirements.txt +++ b/src/test/rgw/bucket_notification/requirements.txt @@ -1,4 +1,4 @@ -nose >=1.0.0 +nose-py3 >=1.0.0 boto >=2.6.0 boto3 >=1.0.0 configparser >=5.0.0 diff --git a/src/test/rgw/bucket_notification/test_bn.py b/src/test/rgw/bucket_notification/test_bn.py index 8982a50d40c3..a33e47c67da6 100644 --- a/src/test/rgw/bucket_notification/test_bn.py +++ b/src/test/rgw/bucket_notification/test_bn.py @@ -418,17 +418,25 @@ kafka_server = 'localhost' class KafkaReceiver(object): """class for receiving and storing messages on a topic from the kafka broker""" - def __init__(self, topic, security_type): + def __init__(self, topic, security_type, kafka_server='localhost'): from kafka import KafkaConsumer remaining_retries = 10 port = 9092 if security_type != 'PLAINTEXT': security_type = 'SSL' port = 9093 + + if kafka_server is None: + endpoint = "localhost" + ":" + str(port) + elif ":" not in kafka_server: + endpoint = kafka_server + ":" + str(port) + else: + endpoint = kafka_server + while remaining_retries > 0: try: - self.consumer = KafkaConsumer(topic, - bootstrap_servers = kafka_server+':'+str(port), + self.consumer = KafkaConsumer(topic, + bootstrap_servers=endpoint, security_protocol=security_type, consumer_timeout_ms=16000) print('Kafka consumer created on topic: '+topic) @@ -467,9 +475,9 @@ def kafka_receiver_thread_runner(receiver): print('Kafka receiver ended unexpectedly: ' + str(error)) -def create_kafka_receiver_thread(topic, security_type='PLAINTEXT'): +def create_kafka_receiver_thread(topic, security_type='PLAINTEXT', kafka_brokers=None): """create kafka receiver and thread""" - receiver = KafkaReceiver(topic, security_type) + receiver = KafkaReceiver(topic, security_type, kafka_server=kafka_brokers) task = threading.Thread(target=kafka_receiver_thread_runner, args=(receiver,)) task.daemon = True return task, receiver @@ -1395,6 +1403,139 @@ def test_ps_s3_notification_push_amqp_on_master(): conn.delete_bucket(bucket_name) +def notification_push(endpoint_type, conn, account=None, cloudevents=False, kafka_brokers=None): + """ test pushinging notification """ + zonegroup = get_config_zonegroup() + # create bucket + bucket_name = gen_bucket_name() + bucket = conn.create_bucket(bucket_name) + topic_name = bucket_name + TOPIC_SUFFIX + + host = get_ip() + task = None + if endpoint_type == 'http': + # create random port for the http server + host = get_ip() + port = random.randint(10000, 20000) + # start an http server in a separate thread + receiver = HTTPServerWithEvents((host, port), cloudevents=cloudevents) + endpoint_address = 'http://'+host+':'+str(port) + if cloudevents: + endpoint_args = 'push-endpoint='+endpoint_address+'&cloudevents=true' + else: + endpoint_args = 'push-endpoint='+endpoint_address + topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) + topic_arn = topic_conf.set_config() + # create s3 notification + notification_name = bucket_name + NOTIFICATION_SUFFIX + topic_conf_list = [{'Id': notification_name, + 'TopicArn': topic_arn, + 'Events': [] + }] + s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) + response, status = s3_notification_conf.set_config() + assert_equal(status/100, 2) + elif endpoint_type == 'amqp': + # start amqp receiver + exchange = 'ex1' + task, receiver = create_amqp_receiver_thread(exchange, topic_name) + task.start() + endpoint_address = 'amqp://' + host + # with acks from broker + exchange = 'ex1' + endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker' + # create two s3 topic + topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) + topic_arn = topic_conf.set_config() + # create s3 notification + notification_name = bucket_name + NOTIFICATION_SUFFIX + topic_conf_list = [{'Id': notification_name, + 'TopicArn': topic_arn, + 'Events': [] + }] + s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) + response, status = s3_notification_conf.set_config() + assert_equal(status/100, 2) + elif endpoint_type == 'kafka': + # start amqp receiver + task, receiver = create_kafka_receiver_thread(topic_name, kafka_brokers=kafka_brokers) + task.start() + endpoint_address = 'kafka://' + kafka_server + # without acks from broker + endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker' + if kafka_brokers is not None: + endpoint_args += '&kafka-brokers=' + kafka_brokers + # create s3 topic + topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) + topic_arn = topic_conf.set_config() + # create s3 notification + notification_name = bucket_name + NOTIFICATION_SUFFIX + topic_conf_list = [{'Id': notification_name, + 'TopicArn': topic_arn, + 'Events': [] + }] + s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) + response, status = s3_notification_conf.set_config() + assert_equal(status/100, 2) + else: + return SkipTest('Unknown endpoint type: ' + endpoint_type) + + # create objects in the bucket + number_of_objects = 100 + if cloudevents: + number_of_objects = 10 + client_threads = [] + etags = [] + objects_size = {} + start_time = time.time() + for i in range(number_of_objects): + content = str(os.urandom(1024*1024)) + etag = hashlib.md5(content.encode()).hexdigest() + etags.append(etag) + object_size = len(content) + key = bucket.new_key(str(i)) + objects_size[key.name] = object_size + thr = threading.Thread(target=set_contents_from_string, args=(key, content,)) + thr.start() + client_threads.append(thr) + [thr.join() for thr in client_threads] + + time_diff = time.time() - start_time + print('average time for creation + ' + endpoint_type + ' notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') + + print('wait for 5sec for the messages...') + time.sleep(5) + + # check receiver + keys = list(bucket.list()) + receiver.verify_s3_events(keys, exact_match=True, deletions=False, expected_sizes=objects_size, etags=etags) + + # delete objects from the bucket + client_threads = [] + start_time = time.time() + for key in bucket.list(): + thr = threading.Thread(target=key.delete, args=()) + thr.start() + client_threads.append(thr) + [thr.join() for thr in client_threads] + + time_diff = time.time() - start_time + print('average time for deletion + ' + endpoint_type + ' notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') + + print('wait for 5sec for the messages...') + time.sleep(5) + + # check receiver + receiver.verify_s3_events(keys, exact_match=True, deletions=True, expected_sizes=objects_size, etags=etags) + + # cleanup + s3_notification_conf.del_config() + topic_conf.del_config() + # delete the bucket + conn.delete_bucket(bucket_name) + receiver.close(task) + + @attr('manual_test') def test_ps_s3_notification_push_amqp_idleness_check(): """ test pushing amqp s3 notification and checking for connection idleness """ @@ -1625,6 +1766,20 @@ def test_ps_s3_notification_push_kafka_on_master(): stop_kafka_receiver(receiver, task) +@attr('kafka_failover') +def test_notification_push_kafka_multiple_brokers_override(): + """ test pushing kafka s3 notification on master """ + conn = connection() + notification_push('kafka', conn, kafka_brokers='localhost:9092,localhost:19092') + + +@attr('kafka_failover') +def test_notification_push_kafka_multiple_brokers_append(): + """ test pushing kafka s3 notification on master """ + conn = connection() + notification_push('kafka', conn, kafka_brokers='localhost:19092') + + @attr('http_test') def test_ps_s3_notification_multi_delete_on_master(): """ test deletion of multiple keys on master """