From 447ea3fff9e727891fd930c1e5caa1fd2f2fc71a Mon Sep 17 00:00:00 2001 From: Yuval Lifshitz Date: Thu, 3 Jul 2025 16:57:39 +0000 Subject: [PATCH] rgw/notifications: test refactoring * kafka: pass full broker list to consumer in tests * kafka: use ip instead of localhost * kafka: make sure topic exists before consumer start * kafka: fix zookeeper and broker conf in tests * kafka: verify receiver in the test * kafka: tests were not running (Fixes: https://tracker.ceph.com/issues/72240) * kafka: failover tests were failing (Fixes: https://tracker.ceph.com/issues/71585) * simplify basic tests run command * v2 migration tests were not running * fix failing migration tests Signed-off-by: Yuval Lifshitz --- .../notifications/tasks/{others => basic}/+ | 0 .../notifications/tasks/{others => basic}/.qa | 0 .../tasks/{others => basic}/0-install.yaml | 0 .../tasks/{others => basic}/supported-distros | 0 .../test_basic.yaml} | 0 .../notifications/tasks/kafka/test_kafka.yaml | 2 +- qa/tasks/kafka_failover.py | 182 ++++++--- qa/tasks/notification_tests.py | 5 +- src/test/rgw/bucket_notification/api.py | 18 +- src/test/rgw/bucket_notification/test_bn.py | 351 +++++++++++------- 10 files changed, 355 insertions(+), 203 deletions(-) rename qa/suites/rgw/notifications/tasks/{others => basic}/+ (100%) rename qa/suites/rgw/notifications/tasks/{others => basic}/.qa (100%) rename qa/suites/rgw/notifications/tasks/{others => basic}/0-install.yaml (100%) rename qa/suites/rgw/notifications/tasks/{others => basic}/supported-distros (100%) rename qa/suites/rgw/notifications/tasks/{others/test_others.yaml => basic/test_basic.yaml} (100%) diff --git a/qa/suites/rgw/notifications/tasks/others/+ b/qa/suites/rgw/notifications/tasks/basic/+ similarity index 100% rename from qa/suites/rgw/notifications/tasks/others/+ rename to qa/suites/rgw/notifications/tasks/basic/+ diff --git a/qa/suites/rgw/notifications/tasks/others/.qa b/qa/suites/rgw/notifications/tasks/basic/.qa similarity index 100% rename from qa/suites/rgw/notifications/tasks/others/.qa rename to qa/suites/rgw/notifications/tasks/basic/.qa diff --git a/qa/suites/rgw/notifications/tasks/others/0-install.yaml b/qa/suites/rgw/notifications/tasks/basic/0-install.yaml similarity index 100% rename from qa/suites/rgw/notifications/tasks/others/0-install.yaml rename to qa/suites/rgw/notifications/tasks/basic/0-install.yaml diff --git a/qa/suites/rgw/notifications/tasks/others/supported-distros b/qa/suites/rgw/notifications/tasks/basic/supported-distros similarity index 100% rename from qa/suites/rgw/notifications/tasks/others/supported-distros rename to qa/suites/rgw/notifications/tasks/basic/supported-distros diff --git a/qa/suites/rgw/notifications/tasks/others/test_others.yaml b/qa/suites/rgw/notifications/tasks/basic/test_basic.yaml similarity index 100% rename from qa/suites/rgw/notifications/tasks/others/test_others.yaml rename to qa/suites/rgw/notifications/tasks/basic/test_basic.yaml diff --git a/qa/suites/rgw/notifications/tasks/kafka/test_kafka.yaml b/qa/suites/rgw/notifications/tasks/kafka/test_kafka.yaml index 303f98d540ea4..4407cd3eaccc3 100644 --- a/qa/suites/rgw/notifications/tasks/kafka/test_kafka.yaml +++ b/qa/suites/rgw/notifications/tasks/kafka/test_kafka.yaml @@ -4,5 +4,5 @@ tasks: kafka_version: 3.8.1 - notification-tests: client.0: - extra_attr: ["kafka_test", "data_path_v2_kafka_test"] + extra_attr: ["kafka_test"] rgw_server: client.0 diff --git a/qa/tasks/kafka_failover.py b/qa/tasks/kafka_failover.py index 3ca60ab84fcfe..d1cbd72d3568e 100644 --- a/qa/tasks/kafka_failover.py +++ b/qa/tasks/kafka_failover.py @@ -25,6 +25,71 @@ def get_kafka_dir(ctx, config): current_version = kafka_prefix + kafka_version return '{tdir}/{ver}'.format(tdir=teuthology.get_testdir(ctx),ver=current_version) +def zookeeper_conf(ctx, client, _id, kafka_dir): + conf = """ + # zookeeper{_id}.properties + dataDir={tdir}/data/zookeeper{_id} + clientPort=218{_id} + maxClientCnxns=0 + admin.enableServer=false + tickTime=2000 + initLimit=10 + syncLimit=5 + server.1=localhost:2888:3888 + server.2=localhost:2889:3889 + """.format(tdir=kafka_dir, _id=_id) + file_name = 'zookeeper{_id}.properties'.format(_id=_id) + log.info("zookeeper conf file: %s", file_name) + log.info(conf) + return ctx.cluster.only(client).run( + args=[ + 'cd', kafka_dir, run.Raw('&&'), + 'mkdir', '-p', 'config', run.Raw('&&'), + 'mkdir', '-p', 'data/zookeeper{_id}'.format(_id=_id), run.Raw('&&'), + 'echo', conf, run.Raw('>'), 'config/{file_name}'.format(file_name=file_name), run.Raw('&&'), + 'echo', str(_id), run.Raw('>'), 'data/zookeeper{_id}/myid'.format(_id=_id) + ], + ) + + +def broker_conf(ctx, client, _id, kafka_dir): + (remote,) = ctx.cluster.only(client).remotes.keys() + conf = """ + # kafka{_id}.properties + broker.id={_id} + listeners=PLAINTEXT://0.0.0.0:909{_id} + advertised.listeners=PLAINTEXT://{ip}:909{_id} + log.dirs={tdir}/data/kafka-logs-{_id} + num.network.threads=3 + num.io.threads=8 + socket.send.buffer.bytes=102400 + socket.receive.buffer.bytes=102400 + socket.request.max.bytes=369295617 + num.partitions=1 + num.recovery.threads.per.data.dir=1 + offsets.topic.replication.factor=2 + transaction.state.log.replication.factor=2 + transaction.state.log.min.isr=2 + log.retention.hours=168 + log.segment.bytes=1073741824 + log.retention.check.interval.ms=300000 + zookeeper.connect=localhost:2181,localhost:2182 + zookeeper.connection.timeout.ms=18000 + group.initial.rebalance.delay.ms=0 + metadata.max.age.ms=3000 + """.format(tdir=kafka_dir, _id=_id, ip=remote.ip_address) + file_name = 'kafka{_id}.properties'.format(_id=_id) + log.info("kafka conf file: %s", file_name) + log.info(conf) + return ctx.cluster.only(client).run( + args=[ + 'cd', kafka_dir, run.Raw('&&'), + 'mkdir', '-p', 'config', run.Raw('&&'), + 'mkdir', '-p', 'data', run.Raw('&&'), + 'echo', conf, run.Raw('>'), 'config/{file_name}'.format(file_name=file_name) + ], + ) + @contextlib.contextmanager def install_kafka(ctx, config): @@ -59,45 +124,21 @@ def install_kafka(ctx, config): ) kafka_dir = get_kafka_dir(ctx, config) - # create config for second broker - second_broker_config_name = "server2.properties" - second_broker_data = "{tdir}/data/broker02".format(tdir=kafka_dir) - second_broker_data_logs_escaped = "{}/logs".format(second_broker_data).replace("/", "\/") - - ctx.cluster.only(client).run( - args=['cd', '{tdir}'.format(tdir=kafka_dir), run.Raw('&&'), - 'cp', '{tdir}/config/server.properties'.format(tdir=kafka_dir), '{tdir}/config/{second_broker_config_name}'.format(tdir=kafka_dir, second_broker_config_name=second_broker_config_name), run.Raw('&&'), - 'mkdir', '-p', '{tdir}/data'.format(tdir=kafka_dir) - ], - ) - - # edit config - ctx.cluster.only(client).run( - args=['sed', '-i', 's/broker.id=0/broker.id=1/g', '{tdir}/config/{second_broker_config_name}'.format(tdir=kafka_dir, second_broker_config_name=second_broker_config_name), run.Raw('&&'), - 'sed', '-i', 's/#listeners=PLAINTEXT:\/\/:9092/listeners=PLAINTEXT:\/\/localhost:19092/g', '{tdir}/config/{second_broker_config_name}'.format(tdir=kafka_dir, second_broker_config_name=second_broker_config_name), run.Raw('&&'), - 'sed', '-i', 's/#advertised.listeners=PLAINTEXT:\/\/your.host.name:9092/advertised.listeners=PLAINTEXT:\/\/localhost:19092/g', '{tdir}/config/{second_broker_config_name}'.format(tdir=kafka_dir, second_broker_config_name=second_broker_config_name), run.Raw('&&'), - 'sed', '-i', 's/log.dirs=\/tmp\/kafka-logs/log.dirs={}/g'.format(second_broker_data_logs_escaped), '{tdir}/config/{second_broker_config_name}'.format(tdir=kafka_dir, second_broker_config_name=second_broker_config_name), run.Raw('&&'), - 'cat', '{tdir}/config/{second_broker_config_name}'.format(tdir=kafka_dir, second_broker_config_name=second_broker_config_name) - ] - ) + # create config for 2 zookeepers + zookeeper_conf(ctx, client, 1, kafka_dir) + zookeeper_conf(ctx, client, 2, kafka_dir) + # create config for 2 brokers + broker_conf(ctx, client, 1, kafka_dir) + broker_conf(ctx, client, 2, kafka_dir) try: yield finally: log.info('Removing packaged dependencies of Kafka...') - test_dir=get_kafka_dir(ctx, config) - current_version = get_kafka_version(config) + kafka_dir=get_kafka_dir(ctx, config) for (client,_) in config.items(): ctx.cluster.only(client).run( - args=['rm', '-rf', '{tdir}/logs'.format(tdir=test_dir)], - ) - - ctx.cluster.only(client).run( - args=['rm', '-rf', test_dir], - ) - - ctx.cluster.only(client).run( - args=['rm', '-rf', '{tdir}/{doc}'.format(tdir=teuthology.get_testdir(ctx),doc=kafka_file)], + args=['rm', '-rf', '{tdir}'.format(tdir=kafka_dir)], ) @@ -114,32 +155,48 @@ def run_kafka(ctx,config): (remote,) = ctx.cluster.only(client).remotes.keys() kafka_dir = get_kafka_dir(ctx, config) - second_broker_data = "{tdir}/data/broker02".format(tdir=kafka_dir) - second_broker_java_log_dir = "{}/java_logs".format(second_broker_data) - ctx.cluster.only(client).run( args=['cd', '{tdir}/bin'.format(tdir=kafka_dir), run.Raw('&&'), - './zookeeper-server-start.sh', - '{tir}/config/zookeeper.properties'.format(tir=kafka_dir), - run.Raw('&'), 'exit' + './zookeeper-server-start.sh', '-daemon', + '{tdir}/config/zookeeper1.properties'.format(tdir=kafka_dir) + ], + ) + ctx.cluster.only(client).run( + args=['cd', '{tdir}/bin'.format(tdir=kafka_dir), run.Raw('&&'), + './zookeeper-server-start.sh', '-daemon', + '{tdir}/config/zookeeper2.properties'.format(tdir=kafka_dir) ], ) + # wait for zookeepers to start + time.sleep(5) + for zk_id in [1, 2]: + ctx.cluster.only(client).run( + args=['cd', '{tdir}/bin'.format(tdir=kafka_dir), run.Raw('&&'), + './zookeeper-shell.sh', 'localhost:218{_id}'.format(_id=zk_id), 'ls', '/'], + ) + zk_started = False + while not zk_started: + result = ctx.cluster.only(client).run( + args=['cd', '{tdir}/bin'.format(tdir=kafka_dir), run.Raw('&&'), + './zookeeper-shell.sh', 'localhost:218{_id}'.format(_id=zk_id), 'ls', '/'], + ) + log.info("Checking if Zookeeper %d is started. Result: %s", zk_id, str(result)) + zk_started = True ctx.cluster.only(client).run( args=['cd', '{tdir}/bin'.format(tdir=kafka_dir), run.Raw('&&'), - './kafka-server-start.sh', - '{tir}/config/server.properties'.format(tir=get_kafka_dir(ctx, config)), - run.Raw('&'), 'exit' + './kafka-server-start.sh', '-daemon', + '{tdir}/config/kafka1.properties'.format(tdir=get_kafka_dir(ctx, config)) ], ) - ctx.cluster.only(client).run( args=['cd', '{tdir}/bin'.format(tdir=kafka_dir), run.Raw('&&'), - run.Raw('LOG_DIR={second_broker_java_log_dir}'.format(second_broker_java_log_dir=second_broker_java_log_dir)), - './kafka-server-start.sh', '{tdir}/config/server2.properties'.format(tdir=kafka_dir), - run.Raw('&'), 'exit' + './kafka-server-start.sh', '-daemon', + '{tdir}/config/kafka2.properties'.format(tdir=get_kafka_dir(ctx, config)) ], ) + # wait for kafka to start + time.sleep(5) try: yield @@ -151,27 +208,41 @@ def run_kafka(ctx,config): ctx.cluster.only(client).run( args=['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'), - './kafka-server-stop.sh', - '{tir}/config/kafka.properties'.format(tir=get_kafka_dir(ctx, config)), + './kafka-server-stop.sh', + '{tdir}/config/kafka1.properties'.format(tdir=get_kafka_dir(ctx, config)), ], ) + ctx.cluster.only(client).run( + args=['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'), + './kafka-server-stop.sh', + '{tdir}/config/kafka2.properties'.format(tdir=get_kafka_dir(ctx, config)), + ], + ) + + # wait for kafka to stop time.sleep(5) ctx.cluster.only(client).run( - args=['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'), + args=['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'), + './zookeeper-server-stop.sh', + '{tir}/config/zookeeper1.properties'.format(tir=get_kafka_dir(ctx, config)), + ], + ) + ctx.cluster.only(client).run( + args=['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'), './zookeeper-server-stop.sh', - '{tir}/config/zookeeper.properties'.format(tir=get_kafka_dir(ctx, config)), + '{tir}/config/zookeeper2.properties'.format(tir=get_kafka_dir(ctx, config)), ], ) + # wait for zookeeper to stop time.sleep(5) - ctx.cluster.only(client).run(args=['killall', '-9', 'java']) @contextlib.contextmanager -def run_admin_cmds(ctx,config): +def run_admin_cmds(ctx, config): """ Running Kafka Admin commands in order to check the working of producer anf consumer and creation of topic. """ @@ -182,9 +253,9 @@ def run_admin_cmds(ctx,config): ctx.cluster.only(client).run( args=[ - 'cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'), + 'cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'), './kafka-topics.sh', '--create', '--topic', 'quickstart-events', - '--bootstrap-server', 'localhost:9092' + '--bootstrap-server', 'localhost:9091,localhost:9092', ], ) @@ -193,7 +264,7 @@ def run_admin_cmds(ctx,config): 'cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'), 'echo', "First", run.Raw('|'), './kafka-console-producer.sh', '--topic', 'quickstart-events', - '--bootstrap-server', 'localhost:9092' + '--bootstrap-server', 'localhost:9091,localhost:9092', ], ) @@ -202,8 +273,7 @@ def run_admin_cmds(ctx,config): 'cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'), './kafka-console-consumer.sh', '--topic', 'quickstart-events', '--from-beginning', - '--bootstrap-server', 'localhost:9092', - run.Raw('&'), 'exit' + '--bootstrap-server', 'localhost:9091,localhost:9092', '--max-messages', '1', ], ) diff --git a/qa/tasks/notification_tests.py b/qa/tasks/notification_tests.py index f1eae3c89c4e2..cc84b1575a5f1 100644 --- a/qa/tasks/notification_tests.py +++ b/qa/tasks/notification_tests.py @@ -220,7 +220,7 @@ def run_tests(ctx, config): for client, client_config in config.items(): (remote,) = ctx.cluster.only(client).remotes.keys() - attr = ["!kafka_test", "!data_path_v2_kafka_test", "!kafka_failover", "!amqp_test", "!amqp_ssl_test", "!kafka_security_test", "!modification_required", "!manual_test", "!http_test"] + attr = ["basic_test"] if 'extra_attr' in client_config: attr = client_config.get('extra_attr') @@ -291,6 +291,7 @@ def task(ctx,config): endpoint = ctx.rgw.role_endpoints.get(client) assert endpoint, 'bntests: no rgw endpoint for {}'.format(client) + cluster_name, _, _ = teuthology.split_role(client) bntests_conf[client] = ConfigObj( indent_type='', infile={ @@ -299,7 +300,7 @@ def task(ctx,config): 'port':endpoint.port, 'host':endpoint.dns_name, 'zonegroup':ctx.rgw.zonegroup, - 'cluster':'noname', + 'cluster':cluster_name, 'version':'v2' }, 's3 main':{} diff --git a/src/test/rgw/bucket_notification/api.py b/src/test/rgw/bucket_notification/api.py index 63b1ec699779e..1c2c4c54b0422 100644 --- a/src/test/rgw/bucket_notification/api.py +++ b/src/test/rgw/bucket_notification/api.py @@ -247,13 +247,19 @@ def delete_all_topics(conn, tenant, cluster): if tenant == '': topics_result = admin(['topic', 'list'], cluster) topics_json = json.loads(topics_result[0]) - for topic in topics_json: - rm_result = admin(['topic', 'rm', '--topic', topic['name']], cluster) - print(rm_result) + try: + for topic in topics_json['topics']: + admin(['topic', 'rm', '--topic', topic['name']], cluster) + except TypeError: + for topic in topics_json: + admin(['topic', 'rm', '--topic', topic['name']], cluster) else: topics_result = admin(['topic', 'list', '--tenant', tenant], cluster) topics_json = json.loads(topics_result[0]) - for topic in topics_json: - rm_result = admin(['topic', 'rm', '--tenant', tenant, '--topic', topic['name']], cluster) - print(rm_result) + try: + for topic in topics_json['topics']: + admin(['topic', 'rm', '--tenant', tenant, '--topic', topic['name']], cluster) + except TypeError: + for topic in topics_json: + admin(['topic', 'rm', '--tenant', tenant, '--topic', topic['name']], cluster) diff --git a/src/test/rgw/bucket_notification/test_bn.py b/src/test/rgw/bucket_notification/test_bn.py index 229dc316481df..e27517ebd2f2c 100644 --- a/src/test/rgw/bucket_notification/test_bn.py +++ b/src/test/rgw/bucket_notification/test_bn.py @@ -82,6 +82,18 @@ UID_PREFIX = "superman" num_buckets = 0 run_prefix=''.join(random.choice(string.ascii_lowercase) for _ in range(6)) + +def get_ip(): + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + try: + # address should not be reachable + s.connect(('10.255.255.255', 1)) + ip = s.getsockname()[0] + finally: + s.close() + return ip + + def gen_bucket_name(): global num_buckets @@ -406,45 +418,76 @@ META_PREFIX = 'x-amz-meta-' # Kafka endpoint functions -kafka_server = 'localhost' +default_kafka_server = get_ip() class KafkaReceiver(object): """class for receiving and storing messages on a topic from the kafka broker""" - def __init__(self, topic, security_type, kafka_server='localhost'): + def __init__(self, topic_name, security_type, kafka_server): from kafka import KafkaConsumer - remaining_retries = 10 + from kafka.admin import KafkaAdminClient, NewTopic + from kafka.errors import TopicAlreadyExistsError + self.status = 'init' port = 9092 if security_type != 'PLAINTEXT': security_type = 'SSL' port = 9093 if kafka_server is None: - endpoint = "localhost" + ":" + str(port) - elif ":" not in kafka_server: + endpoint = default_kafka_server + ":" + str(port) + elif ":" not in kafka_server and len(kafka_server.split(",")) == 1: endpoint = kafka_server + ":" + str(port) else: endpoint = kafka_server + remaining_retries = 10 while remaining_retries > 0: try: - self.consumer = KafkaConsumer(topic, + admin_client = KafkaAdminClient( + bootstrap_servers=endpoint, + request_timeout_ms=16000) + topic = NewTopic(name=topic_name, num_partitions=1, replication_factor=1) + admin_client.create_topics([topic]) + log.info('Kafka admin created topic: %s on broker/s: %s', topic_name, endpoint) + break + except Exception as error: + if type(error) == TopicAlreadyExistsError: + log.info('Kafka admin topic %s already exists on broker/s: %s', topic_name, endpoint) + break + remaining_retries -= 1 + log.warning('Kafka admin failed to create topic: %s on broker/s: %s. remaining reties: %d. error: %s', + topic_name, endpoint , remaining_retries, str(error)) + time.sleep(1) + + if remaining_retries == 0: + raise Exception('Kafka admin failed to create topic: %s. no retries left', topic_name) + + remaining_retries = 10 + while remaining_retries > 0: + try: + self.consumer = KafkaConsumer(topic_name, bootstrap_servers=endpoint, security_protocol=security_type, - consumer_timeout_ms=16000, + metadata_max_age_ms=5000, + consumer_timeout_ms=5000, auto_offset_reset='earliest') - print('Kafka consumer created on topic: '+topic) + log.info('Kafka consumer connected to broker/s: %s for topic: %s', endpoint , topic_name) + # This forces the consumer to fetch metadata immediately + partitions = self.consumer.partitions_for_topic(topic) + log.info('Kafka consumer partitions for topic: %s are: %s', topic_name, str(partitions)) + self.consumer.poll(timeout_ms=1000, max_records=1) break except Exception as error: remaining_retries -= 1 - print('failed to connect to kafka (remaining retries ' - + str(remaining_retries) + '): ' + str(error)) + log.warning('Kafka consumer failed to connect to broker/s: %s. for topic: %. remaining reties: %d. error: %s', + endpoint, topic_name, remaining_retries, str(error)) time.sleep(1) if remaining_retries == 0: - raise Exception('failed to connect to kafka - no retries left') + raise Exception('Kafka consumer failed to connect to kafka for topic: %s. no retries left', topic_name) + self.status = 'connected' self.events = [] - self.topic = topic + self.topic = topic_name self.stop = False def verify_s3_events(self, keys, exact_match=False, deletions=False, expected_sizes={}, etags=[]): @@ -463,17 +506,16 @@ class KafkaReceiver(object): def kafka_receiver_thread_runner(receiver): """main thread function for the kafka receiver""" try: - log.info('Kafka receiver started') - print('Kafka receiver started') + log.info('Kafka receiver for topic: %s started', receiver.topic) + receiver.status = 'running' while not receiver.stop: for msg in receiver.consumer: receiver.events.append(json.loads(msg.value)) time.sleep(0.1) - log.info('Kafka receiver ended') - print('Kafka receiver ended') + log.info('Kafka receiver for topic: %s ended', receiver.topic) except Exception as error: - log.info('Kafka receiver ended unexpectedly: %s', str(error)) - print('Kafka receiver ended unexpectedly: ' + str(error)) + log.info('Kafka receiver for topic: %s ended unexpectedly. error: %s', receiver.topic, str(error)) + receiver.status = 'ended' def create_kafka_receiver_thread(topic, security_type='PLAINTEXT', kafka_brokers=None): @@ -486,23 +528,34 @@ def create_kafka_receiver_thread(topic, security_type='PLAINTEXT', kafka_brokers def stop_kafka_receiver(receiver, task): """stop the receiver thread and wait for it to finish""" receiver.stop = True - task.join(1) + task.join(5) try: receiver.consumer.unsubscribe() receiver.consumer.close() + log.info('Kafka receiver on topic: %s gracefully stopped', receiver.topic) except Exception as error: - log.info('failed to gracefully stop Kafka receiver: %s', str(error)) - - -def get_ip(): - s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - try: - # address should not be reachable - s.connect(('10.255.255.255', 1)) - ip = s.getsockname()[0] - finally: - s.close() - return ip + log.info('Kafka receiver on topic: %s failed to gracefully stop. error: %s', receiver.topic, str(error)) + +def verify_kafka_receiver(receiver): + """test the kafka receiver""" + from kafka import KafkaProducer + producer = KafkaProducer(bootstrap_servers=receiver.consumer.config['bootstrap_servers'], + security_protocol=receiver.consumer.config['security_protocol']) + producer.send(receiver.topic, value=json.dumps({'test': 'message'}).encode('utf-8')) + producer.flush() + events = [] + remaining_retries = 10 + while len(events) == 0: + log.info('Kafka receiver (in "%s" state) waiting for test event (at: %s). remaining retries: %d', + receiver.status, datetime.datetime.now(), remaining_retries) + time.sleep(1) + events = receiver.get_and_reset_events() + remaining_retries -= 1 + if remaining_retries == 0: + raise Exception('kafka receiver on topic: %s did not receive test event in time', receiver.topic) + assert_equal(len(events), 1) + assert_in('test', events[0]) + log.info('Kafka receiver on topic: %s tested ok', receiver.topic) def connection(no_retries=False): @@ -546,8 +599,8 @@ def another_user(user=None, tenant=None, account=None): cmd += ['--account-id', account, '--account-root'] arn = f'arn:aws:iam::{account}:user/Superman' - _, result = admin(cmd, get_config_cluster()) - assert_equal(result, 0) + _, rc = admin(cmd, get_config_cluster()) + assert_equal(rc, 0) conn = S3Connection(aws_access_key_id=access_key, aws_secret_access_key=secret_key, @@ -561,9 +614,15 @@ def list_topics(assert_len=None, tenant=''): result = admin(['topic', 'list'], get_config_cluster()) else: result = admin(['topic', 'list', '--tenant', tenant], get_config_cluster()) + assert_equal(result[1], 0) parsed_result = json.loads(result[0]) - if assert_len: - assert_equal(len(parsed_result), assert_len) + try: + actual_len = len(parsed_result['topics']) + except TypeError: + actual_len = len(parsed_result) + if assert_len and assert_len != actual_len: + log.error(parsed_result) + assert 'expected %d topics, got %d' % (assert_len, actual_len) return parsed_result @@ -580,7 +639,7 @@ def get_stats_persistent_topic(topic_name, assert_entries_number=None): log.warning('Topic dump:') for entry in parsed_result: log.warning(entry) - assert_equal(actual_number, assert_entries_number) + assert 'expected %d entries, got %d' % (assert_entries_number, actual_number) return parsed_result @@ -611,13 +670,16 @@ def list_notifications(bucket_name, assert_len=None, tenant=''): result = admin(['notification', 'list', '--bucket', bucket_name], get_config_cluster()) else: result = admin(['notification', 'list', '--bucket', bucket_name, '--tenant', tenant], get_config_cluster()) + assert_equal(result[1], 0) parsed_result = json.loads(result[0]) - if assert_len: - assert_equal(len(parsed_result['notifications']), assert_len) + actual_len = len(parsed_result['notifications']) + if assert_len and assert_len != actual_len: + log.error(parsed_result) + assert 'expected %d notifications, got %d' % (assert_len, actual_len) return parsed_result -def get_notification(bucket_name, notification_name, tenant=''): +def get_notification(bucket_name, notification_name, tenant=''): if tenant == '': result = admin(['notification', 'get', '--bucket', bucket_name, '--notification-id', notification_name], get_config_cluster()) else: @@ -661,10 +723,10 @@ def connect_random_user(tenant=''): secret_key = str(time.time()) uid = UID_PREFIX + str(time.time()) if tenant == '': - _, result = admin(['user', 'create', '--uid', uid, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"'], get_config_cluster()) + _, rc = admin(['user', 'create', '--uid', uid, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"'], get_config_cluster()) else: - _, result = admin(['user', 'create', '--uid', uid, '--tenant', tenant, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"'], get_config_cluster()) - assert_equal(result, 0) + _, rc = admin(['user', 'create', '--uid', uid, '--tenant', tenant, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"'], get_config_cluster()) + assert_equal(rc, 0) conn = S3Connection(aws_access_key_id=access_key, aws_secret_access_key=secret_key, is_secure=False, port=get_config_port(), host=get_config_host(), @@ -684,6 +746,7 @@ def test_ps_s3_topic_on_master(): conn = connect_random_user(tenant) # make sure there are no leftover topics + delete_all_topics(conn, '', get_config_cluster()) delete_all_topics(conn, tenant, get_config_cluster()) zonegroup = get_config_zonegroup() @@ -747,6 +810,7 @@ def test_ps_s3_topic_admin_on_master(): conn = connect_random_user(tenant) # make sure there are no leftover topics + delete_all_topics(conn, '', get_config_cluster()) delete_all_topics(conn, tenant, get_config_cluster()) zonegroup = get_config_zonegroup() @@ -792,6 +856,7 @@ def test_ps_s3_topic_admin_on_master(): result = admin( ['topic', 'get', '--topic', topic_name + '_3', '--tenant', tenant], get_config_cluster()) + assert_equal(result[1], 0) parsed_result = json.loads(result[0]) assert_equal(parsed_result['arn'], topic_arn3) assert_true(all([x in parsed_result['owner'] for x in matches])) @@ -1374,10 +1439,14 @@ def notification_push(endpoint_type, conn, account=None, cloudevents=False, kafk response, status = s3_notification_conf.set_config() assert_equal(status/100, 2) elif endpoint_type == 'kafka': - # start amqp receiver + # start kafka receiver + default_kafka_server_and_port = default_kafka_server + ':9092' + if kafka_brokers is not None: + kafka_brokers = kafka_brokers + ',' + default_kafka_server_and_port task, receiver = create_kafka_receiver_thread(topic_name, kafka_brokers=kafka_brokers) task.start() - endpoint_address = 'kafka://' + kafka_server + verify_kafka_receiver(receiver) + endpoint_address = 'kafka://' + default_kafka_server_and_port # without acks from broker endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker' if kafka_brokers is not None: @@ -1603,14 +1672,14 @@ def test_notification_push_kafka(): def test_notification_push_kafka_multiple_brokers_override(): """ test pushing kafka s3 notification on master """ conn = connection() - notification_push('kafka', conn, kafka_brokers='localhost:9092,localhost:19092') + notification_push('kafka', conn, kafka_brokers='{host}:9091,{host}:9092'.format(host=default_kafka_server)) @attr('kafka_failover') def test_notification_push_kafka_multiple_brokers_append(): """ test pushing kafka s3 notification on master """ conn = connection() - notification_push('kafka', conn, kafka_brokers='localhost:19092') + notification_push('kafka', conn, kafka_brokers='{host}:9091'.format(host=default_kafka_server)) @attr('http_test') @@ -1796,6 +1865,7 @@ def lifecycle(endpoint_type, conn, number_of_objects, topic_events, create_threa # start kafka receiver task, receiver = create_kafka_receiver_thread(topic_name) task.start() + verify_kafka_receiver(receiver) endpoint_address = 'kafka://' + host endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&persistent=true' else: @@ -2286,6 +2356,7 @@ def multipart_endpoint_agnostic(endpoint_type, conn): # start amqp receiver task, receiver = create_kafka_receiver_thread(topic_name) task.start() + verify_kafka_receiver(receiver) endpoint_address = 'kafka://' + host endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker' else: @@ -2380,6 +2451,7 @@ def metadata_filter(endpoint_type, conn): # start kafka receiver task, receiver = create_kafka_receiver_thread(topic_name) task.start() + verify_kafka_receiver(receiver) endpoint_address = 'kafka://' + host endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&persistent=true' else: @@ -3044,6 +3116,7 @@ def persistent_topic_stats(conn, endpoint_type): # start kafka receiver task, receiver = create_kafka_receiver_thread(topic_name) task.start() + verify_kafka_receiver(receiver) endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&persistent=true'+ \ '&retry_sleep_duration=1' else: @@ -3159,7 +3232,7 @@ def test_persistent_topic_dump(): host = get_ip() task, receiver = create_kafka_receiver_thread(topic_name) task.start() - + verify_kafka_receiver(receiver) # create s3 topic endpoint_address = 'kafka://WrongHost' # wrong port @@ -3211,7 +3284,6 @@ def test_persistent_topic_dump(): # topic stats result = admin(['topic', 'dump', '--topic', topic_name], get_config_cluster()) assert_equal(result[1], 0) - print(result[0]) parsed_result = json.loads(result[0]) assert_equal(len(parsed_result), 2*number_of_objects) @@ -3452,9 +3524,10 @@ def test_ps_s3_notification_kafka_idle_behaviour(): task, receiver = create_kafka_receiver_thread(topic_name+'_1') task.start() + verify_kafka_receiver(receiver) # create s3 topic - endpoint_address = 'kafka://' + kafka_server + endpoint_address = 'kafka://' + default_kafka_server # with acks from broker endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker' topic_conf1 = PSTopicS3(conn, topic_name+'_1', zonegroup, endpoint_args=endpoint_args) @@ -3799,6 +3872,7 @@ def persistent_topic_multiple_endpoints(conn, endpoint_type): # start kafka receiver task, receiver = create_kafka_receiver_thread(topic_name_1) task.start() + verify_kafka_receiver(receiver) endpoint_address = 'kafka://' + host endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&persistent=true'+ \ '&retry_sleep_duration=1' @@ -3911,6 +3985,7 @@ def persistent_notification(endpoint_type, conn, account=None): # start kafka receiver task, receiver = create_kafka_receiver_thread(topic_name) task.start() + verify_kafka_receiver(receiver) endpoint_address = 'kafka://' + host endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker'+'&persistent=true' else: @@ -3988,8 +4063,8 @@ def test_ps_s3_persistent_notification_http_account(): account = 'RGW77777777777777777' user = UID_PREFIX + 'test' - _, result = admin(['account', 'create', '--account-id', account, '--account-name', 'testacct'], get_config_cluster()) - assert_true(result in [0, 17]) # EEXIST okay if we rerun + _, rc = admin(['account', 'create', '--account-id', account, '--account-name', 'testacct'], get_config_cluster()) + assert_true(rc in [0, 17]) # EEXIST okay if we rerun conn, _ = another_user(user=user, account=account) try: @@ -4405,7 +4480,7 @@ def test_ps_s3_multiple_topics_notification(): http_server.close() -@attr('data_path_v2_test') +@attr('basic_test') def test_ps_s3_list_topics_migration(): """ test list topics on migration""" if get_config_cluster() == 'noname': @@ -4571,7 +4646,7 @@ def test_ps_s3_list_topics(): tenant_topic_conf.del_config(tenant_topic_arn1) tenant_topic_conf.del_config(tenant_topic_arn2) -@attr('data_path_v2_test') +@attr('basic_test') def test_ps_s3_list_topics_v1(): """ test list topics on v1""" if get_config_cluster() == 'noname': @@ -4851,13 +4926,13 @@ def kafka_security(security_type, mechanism='PLAIN', use_topic_attrs_for_creds=F # create s3 topic if security_type == 'SASL_SSL': if not use_topic_attrs_for_creds: - endpoint_address = 'kafka://alice:alice-secret@' + kafka_server + ':9094' + endpoint_address = 'kafka://alice:alice-secret@' + default_kafka_server + ':9094' else: - endpoint_address = 'kafka://' + kafka_server + ':9094' + endpoint_address = 'kafka://' + default_kafka_server + ':9094' elif security_type == 'SSL': - endpoint_address = 'kafka://' + kafka_server + ':9093' + endpoint_address = 'kafka://' + default_kafka_server + ':9093' elif security_type == 'SASL_PLAINTEXT': - endpoint_address = 'kafka://alice:alice-secret@' + kafka_server + ':9095' + endpoint_address = 'kafka://alice:alice-secret@' + default_kafka_server + ':9095' else: assert False, 'unknown security method '+security_type @@ -4877,6 +4952,7 @@ def kafka_security(security_type, mechanism='PLAIN', use_topic_attrs_for_creds=F # create consumer on the topic task, receiver = create_kafka_receiver_thread(topic_name) task.start() + verify_kafka_receiver(receiver) topic_arn = topic_conf.set_config() # create s3 notification @@ -5064,6 +5140,24 @@ def test_persistent_ps_s3_reload(): http_server.close() +def poll_on_topic(topic_name, tenant=''): + remaining_retries = 10 + start_time = time.time() + while True: + result = remove_topic(topic_name, tenant, allow_failure=True) + time_diff = time.time() - start_time + if result == 0: + log.info('migration took %d seconds', time_diff) + return + elif result == 154: + if remaining_retries == 0: + assert False, 'migration did not end after %d seconds' % time_diff + remaining_retries -= 1 + log.info('migration in process. remaining retries: %d', remaining_retries) + time.sleep(2) + else: + assert False, 'unexpected error (%d) trying to remove topic when waiting for migration to end' % result + def persistent_data_path_v2_migration(conn, endpoint_type): """ test data path v2 persistent migration """ if get_config_cluster() == 'noname': @@ -5101,6 +5195,7 @@ def persistent_data_path_v2_migration(conn, endpoint_type): # start kafka receiver task, receiver = create_kafka_receiver_thread(topic_name) task.start() + verify_kafka_receiver(receiver) endpoint_address = 'kafka://' + host endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&persistent=true'+ \ '&retry_sleep_duration=1' @@ -5149,10 +5244,7 @@ def persistent_data_path_v2_migration(conn, endpoint_type): zonegroup_modify_feature(enable=True, feature_name=zonegroup_feature_notification_v2) # poll on topic_1 - result = 1 - while result != 0: - time.sleep(1) - result = remove_topic(topic_name_1, allow_failure=True) + poll_on_topic(topic_name_1) # topic stats get_stats_persistent_topic(topic_name, number_of_objects) @@ -5197,21 +5289,21 @@ def persistent_data_path_v2_migration(conn, endpoint_type): receiver.close(task) -@attr('data_path_v2_test') +@attr('http_test') def persistent_data_path_v2_migration_http(): """ test data path v2 persistent migration, http endpoint """ conn = connection() persistent_data_path_v2_migration(conn, 'http') -@attr('data_path_v2_kafka_test') +@attr('kafka_test') def persistent_data_path_v2_migration_kafka(): """ test data path v2 persistent migration, kafka endpoint """ conn = connection() persistent_data_path_v2_migration(conn, 'kafka') -@attr('data_path_v2_test') +@attr('http_test') def test_ps_s3_data_path_v2_migration(): """ test data path v2 migration """ if get_config_cluster() == 'noname': @@ -5263,60 +5355,53 @@ def test_ps_s3_data_path_v2_migration(): time_diff = time.time() - start_time print('average time for creation + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') - try: - # verify events - keys = list(bucket.list()) - http_server.verify_s3_events(keys, exact_match=True) + # verify events + keys = list(bucket.list()) + http_server.verify_s3_events(keys, exact_match=True) - # create topic to poll on - topic_name_1 = topic_name + '_1' - topic_conf_1 = PSTopicS3(conn, topic_name_1, zonegroup, endpoint_args=endpoint_args) + # create topic to poll on + topic_name_1 = topic_name + '_1' + topic_conf_1 = PSTopicS3(conn, topic_name_1, zonegroup, endpoint_args=endpoint_args) - # enable v2 notification - zonegroup_modify_feature(enable=True, feature_name=zonegroup_feature_notification_v2) + # enable v2 notification + zonegroup_modify_feature(enable=True, feature_name=zonegroup_feature_notification_v2) - # poll on topic_1 - result = 1 - while result != 0: - time.sleep(1) - result = remove_topic(topic_name_1, allow_failure=True) + # poll on topic_1 + poll_on_topic(topic_name_1) - # create more objects in the bucket (async) - client_threads = [] - start_time = time.time() - for i in range(number_of_objects): - key = bucket.new_key('key-'+str(i)) - content = str(os.urandom(1024*1024)) - thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) - thr.start() - client_threads.append(thr) - [thr.join() for thr in client_threads] - time_diff = time.time() - start_time - print('average time for creation + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') + # create more objects in the bucket (async) + client_threads = [] + start_time = time.time() + for i in range(number_of_objects): + key = bucket.new_key('key-'+str(i)) + content = str(os.urandom(1024*1024)) + thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) + thr.start() + client_threads.append(thr) + [thr.join() for thr in client_threads] + time_diff = time.time() - start_time + print('average time for creation + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') - # verify events - keys = list(bucket.list()) - http_server.verify_s3_events(keys, exact_match=True) + # verify events + keys = list(bucket.list()) + http_server.verify_s3_events(keys, exact_match=True) - except Exception as e: - assert False, str(e) - finally: - # cleanup - s3_notification_conf.del_config() - topic_conf.del_config() - # delete objects from the bucket - client_threads = [] - for key in bucket.list(): - thr = threading.Thread(target = key.delete, args=()) - thr.start() - client_threads.append(thr) - [thr.join() for thr in client_threads] - # delete the bucket - conn.delete_bucket(bucket_name) - http_server.close() + # cleanup + s3_notification_conf.del_config() + topic_conf.del_config() + # delete objects from the bucket + client_threads = [] + for key in bucket.list(): + thr = threading.Thread(target = key.delete, args=()) + thr.start() + client_threads.append(thr) + [thr.join() for thr in client_threads] + # delete the bucket + conn.delete_bucket(bucket_name) + http_server.close() -@attr('data_path_v2_test') +@attr('basic_test') def test_ps_s3_data_path_v2_large_migration(): """ test data path v2 large migration """ if get_config_cluster() == 'noname': @@ -5384,17 +5469,12 @@ def test_ps_s3_data_path_v2_large_migration(): # enable v2 notification zonegroup_modify_feature(enable=True, feature_name=zonegroup_feature_notification_v2) - # poll on topic_1 - for tenant, topic_conf in zip(tenants_list, polling_topics_conf): - while True: - result = remove_topic(topic_conf.topic_name, tenant, allow_failure=True) - - if result != 0: - print('migration in process... error: '+str(result)) - else: - break + for tenant in tenants_list: + list_topics(1, tenant) - time.sleep(1) + # poll on topic + for tenant, topic_conf in zip(tenants_list, polling_topics_conf): + poll_on_topic(topic_conf.topic_name, tenant) # check if we migrated all the topics for tenant in tenants_list: @@ -5402,7 +5482,7 @@ def test_ps_s3_data_path_v2_large_migration(): # check if we migrated all the notifications for tenant, bucket in zip(tenants_list, buckets_list): - list_notifications(bucket.name, num_of_s3_notifications) + list_notifications(bucket.name, num_of_s3_notifications, tenant) # cleanup for s3_notification_conf in s3_notification_conf_list: @@ -5414,7 +5494,7 @@ def test_ps_s3_data_path_v2_large_migration(): conn.delete_bucket(bucket.name) -@attr('data_path_v2_test') +@attr('basic_test') def test_ps_s3_data_path_v2_mixed_migration(): """ test data path v2 mixed migration """ if get_config_cluster() == 'noname': @@ -5509,17 +5589,9 @@ def test_ps_s3_data_path_v2_mixed_migration(): # enable v2 notification zonegroup_modify_feature(enable=True, feature_name=zonegroup_feature_notification_v2) - # poll on topic_1 + # poll on topic for tenant, topic_conf in zip(tenants_list, polling_topics_conf): - while True: - result = remove_topic(topic_conf.topic_name, tenant, allow_failure=True) - - if result != 0: - print(result) - else: - break - - time.sleep(1) + poll_on_topic(topic_conf.topic_name, tenant) # check if we migrated all the topics for tenant in tenants_list: @@ -5527,7 +5599,7 @@ def test_ps_s3_data_path_v2_mixed_migration(): # check if we migrated all the notifications for tenant, bucket in zip(tenants_list, buckets_list): - list_notifications(bucket.name, 2) + list_notifications(bucket.name, 2, tenant) # cleanup for s3_notification_conf in s3_notification_conf_list: @@ -5551,8 +5623,9 @@ def test_notification_caching(): # start kafka receiver task, receiver = create_kafka_receiver_thread(topic_name) task.start() - incorrect_port = 8080 - endpoint_address = 'kafka://' + kafka_server + ':' + str(incorrect_port) + verify_kafka_receiver(receiver) + incorrect_port = 9999 + endpoint_address = 'kafka://' + default_kafka_server + ':' + str(incorrect_port) endpoint_args = 'push-endpoint=' + endpoint_address + '&kafka-ack-level=broker' + '&persistent=true' # create s3 topic @@ -5609,7 +5682,7 @@ def test_notification_caching(): assert_equal(parsed_result['Topic Stats']['Entries'], 2 * number_of_objects) # remove the port and update the topic, so its pointing to correct endpoint. - endpoint_address = 'kafka://' + kafka_server + endpoint_address = 'kafka://' + default_kafka_server # update s3 topic topic_conf.set_attributes(attribute_name="push-endpoint", attribute_val=endpoint_address) @@ -5638,9 +5711,11 @@ def test_connection_caching(): # start kafka receiver task_1, receiver_1 = create_kafka_receiver_thread(topic_name_1) task_1.start() + verify_kafka_receiver(receiver_1) task_2, receiver_2 = create_kafka_receiver_thread(topic_name_2) task_2.start() - endpoint_address = 'kafka://' + kafka_server + verify_kafka_receiver(receiver_2) + endpoint_address = 'kafka://' + default_kafka_server endpoint_args = 'push-endpoint=' + endpoint_address + '&kafka-ack-level=broker&use-ssl=true' + '&persistent=true' # initially create both s3 topics with `use-ssl=true` @@ -5702,7 +5777,7 @@ def test_connection_caching(): assert_equal(parsed_result['Topic Stats']['Entries'], 2 * number_of_objects) # remove the ssl from topic1 and update the topic. - endpoint_address = 'kafka://' + kafka_server + endpoint_address = 'kafka://' + default_kafka_server topic_conf_1.set_attributes(attribute_name="use-ssl", attribute_val="false") keys = list(bucket.list()) -- 2.39.5