current_version = kafka_prefix + kafka_version
return '{tdir}/{ver}'.format(tdir=teuthology.get_testdir(ctx),ver=current_version)
+def zookeeper_conf(ctx, client, _id, kafka_dir):
+ conf = """
+ # zookeeper{_id}.properties
+ dataDir={tdir}/data/zookeeper{_id}
+ clientPort=218{_id}
+ maxClientCnxns=0
+ admin.enableServer=false
+ tickTime=2000
+ initLimit=10
+ syncLimit=5
+ server.1=localhost:2888:3888
+ server.2=localhost:2889:3889
+ """.format(tdir=kafka_dir, _id=_id)
+ file_name = 'zookeeper{_id}.properties'.format(_id=_id)
+ log.info("zookeeper conf file: %s", file_name)
+ log.info(conf)
+ return ctx.cluster.only(client).run(
+ args=[
+ 'cd', kafka_dir, run.Raw('&&'),
+ 'mkdir', '-p', 'config', run.Raw('&&'),
+ 'mkdir', '-p', 'data/zookeeper{_id}'.format(_id=_id), run.Raw('&&'),
+ 'echo', conf, run.Raw('>'), 'config/{file_name}'.format(file_name=file_name), run.Raw('&&'),
+ 'echo', str(_id), run.Raw('>'), 'data/zookeeper{_id}/myid'.format(_id=_id)
+ ],
+ )
+
+
+def broker_conf(ctx, client, _id, kafka_dir):
+ (remote,) = ctx.cluster.only(client).remotes.keys()
+ conf = """
+ # kafka{_id}.properties
+ broker.id={_id}
+ listeners=PLAINTEXT://0.0.0.0:909{_id}
+ advertised.listeners=PLAINTEXT://{ip}:909{_id}
+ log.dirs={tdir}/data/kafka-logs-{_id}
+ num.network.threads=3
+ num.io.threads=8
+ socket.send.buffer.bytes=102400
+ socket.receive.buffer.bytes=102400
+ socket.request.max.bytes=369295617
+ num.partitions=1
+ num.recovery.threads.per.data.dir=1
+ offsets.topic.replication.factor=2
+ transaction.state.log.replication.factor=2
+ transaction.state.log.min.isr=2
+ log.retention.hours=168
+ log.segment.bytes=1073741824
+ log.retention.check.interval.ms=300000
+ zookeeper.connect=localhost:2181,localhost:2182
+ zookeeper.connection.timeout.ms=18000
+ group.initial.rebalance.delay.ms=0
+ metadata.max.age.ms=3000
+ """.format(tdir=kafka_dir, _id=_id, ip=remote.ip_address)
+ file_name = 'kafka{_id}.properties'.format(_id=_id)
+ log.info("kafka conf file: %s", file_name)
+ log.info(conf)
+ return ctx.cluster.only(client).run(
+ args=[
+ 'cd', kafka_dir, run.Raw('&&'),
+ 'mkdir', '-p', 'config', run.Raw('&&'),
+ 'mkdir', '-p', 'data', run.Raw('&&'),
+ 'echo', conf, run.Raw('>'), 'config/{file_name}'.format(file_name=file_name)
+ ],
+ )
+
@contextlib.contextmanager
def install_kafka(ctx, config):
)
kafka_dir = get_kafka_dir(ctx, config)
- # create config for second broker
- second_broker_config_name = "server2.properties"
- second_broker_data = "{tdir}/data/broker02".format(tdir=kafka_dir)
- second_broker_data_logs_escaped = "{}/logs".format(second_broker_data).replace("/", "\/")
-
- ctx.cluster.only(client).run(
- args=['cd', '{tdir}'.format(tdir=kafka_dir), run.Raw('&&'),
- 'cp', '{tdir}/config/server.properties'.format(tdir=kafka_dir), '{tdir}/config/{second_broker_config_name}'.format(tdir=kafka_dir, second_broker_config_name=second_broker_config_name), run.Raw('&&'),
- 'mkdir', '-p', '{tdir}/data'.format(tdir=kafka_dir)
- ],
- )
-
- # edit config
- ctx.cluster.only(client).run(
- args=['sed', '-i', 's/broker.id=0/broker.id=1/g', '{tdir}/config/{second_broker_config_name}'.format(tdir=kafka_dir, second_broker_config_name=second_broker_config_name), run.Raw('&&'),
- 'sed', '-i', 's/#listeners=PLAINTEXT:\/\/:9092/listeners=PLAINTEXT:\/\/localhost:19092/g', '{tdir}/config/{second_broker_config_name}'.format(tdir=kafka_dir, second_broker_config_name=second_broker_config_name), run.Raw('&&'),
- 'sed', '-i', 's/#advertised.listeners=PLAINTEXT:\/\/your.host.name:9092/advertised.listeners=PLAINTEXT:\/\/localhost:19092/g', '{tdir}/config/{second_broker_config_name}'.format(tdir=kafka_dir, second_broker_config_name=second_broker_config_name), run.Raw('&&'),
- 'sed', '-i', 's/log.dirs=\/tmp\/kafka-logs/log.dirs={}/g'.format(second_broker_data_logs_escaped), '{tdir}/config/{second_broker_config_name}'.format(tdir=kafka_dir, second_broker_config_name=second_broker_config_name), run.Raw('&&'),
- 'cat', '{tdir}/config/{second_broker_config_name}'.format(tdir=kafka_dir, second_broker_config_name=second_broker_config_name)
- ]
- )
+ # create config for 2 zookeepers
+ zookeeper_conf(ctx, client, 1, kafka_dir)
+ zookeeper_conf(ctx, client, 2, kafka_dir)
+ # create config for 2 brokers
+ broker_conf(ctx, client, 1, kafka_dir)
+ broker_conf(ctx, client, 2, kafka_dir)
try:
yield
finally:
log.info('Removing packaged dependencies of Kafka...')
- test_dir=get_kafka_dir(ctx, config)
- current_version = get_kafka_version(config)
+ kafka_dir=get_kafka_dir(ctx, config)
for (client,_) in config.items():
ctx.cluster.only(client).run(
- args=['rm', '-rf', '{tdir}/logs'.format(tdir=test_dir)],
- )
-
- ctx.cluster.only(client).run(
- args=['rm', '-rf', test_dir],
- )
-
- ctx.cluster.only(client).run(
- args=['rm', '-rf', '{tdir}/{doc}'.format(tdir=teuthology.get_testdir(ctx),doc=kafka_file)],
+ args=['rm', '-rf', '{tdir}'.format(tdir=kafka_dir)],
)
(remote,) = ctx.cluster.only(client).remotes.keys()
kafka_dir = get_kafka_dir(ctx, config)
- second_broker_data = "{tdir}/data/broker02".format(tdir=kafka_dir)
- second_broker_java_log_dir = "{}/java_logs".format(second_broker_data)
-
ctx.cluster.only(client).run(
args=['cd', '{tdir}/bin'.format(tdir=kafka_dir), run.Raw('&&'),
- './zookeeper-server-start.sh',
- '{tir}/config/zookeeper.properties'.format(tir=kafka_dir),
- run.Raw('&'), 'exit'
+ './zookeeper-server-start.sh', '-daemon',
+ '{tdir}/config/zookeeper1.properties'.format(tdir=kafka_dir)
+ ],
+ )
+ ctx.cluster.only(client).run(
+ args=['cd', '{tdir}/bin'.format(tdir=kafka_dir), run.Raw('&&'),
+ './zookeeper-server-start.sh', '-daemon',
+ '{tdir}/config/zookeeper2.properties'.format(tdir=kafka_dir)
],
)
+ # wait for zookeepers to start
+ time.sleep(5)
+ for zk_id in [1, 2]:
+ ctx.cluster.only(client).run(
+ args=['cd', '{tdir}/bin'.format(tdir=kafka_dir), run.Raw('&&'),
+ './zookeeper-shell.sh', 'localhost:218{_id}'.format(_id=zk_id), 'ls', '/'],
+ )
+ zk_started = False
+ while not zk_started:
+ result = ctx.cluster.only(client).run(
+ args=['cd', '{tdir}/bin'.format(tdir=kafka_dir), run.Raw('&&'),
+ './zookeeper-shell.sh', 'localhost:218{_id}'.format(_id=zk_id), 'ls', '/'],
+ )
+ log.info("Checking if Zookeeper %d is started. Result: %s", zk_id, str(result))
+ zk_started = True
ctx.cluster.only(client).run(
args=['cd', '{tdir}/bin'.format(tdir=kafka_dir), run.Raw('&&'),
- './kafka-server-start.sh',
- '{tir}/config/server.properties'.format(tir=get_kafka_dir(ctx, config)),
- run.Raw('&'), 'exit'
+ './kafka-server-start.sh', '-daemon',
+ '{tdir}/config/kafka1.properties'.format(tdir=get_kafka_dir(ctx, config))
],
)
-
ctx.cluster.only(client).run(
args=['cd', '{tdir}/bin'.format(tdir=kafka_dir), run.Raw('&&'),
- run.Raw('LOG_DIR={second_broker_java_log_dir}'.format(second_broker_java_log_dir=second_broker_java_log_dir)),
- './kafka-server-start.sh', '{tdir}/config/server2.properties'.format(tdir=kafka_dir),
- run.Raw('&'), 'exit'
+ './kafka-server-start.sh', '-daemon',
+ '{tdir}/config/kafka2.properties'.format(tdir=get_kafka_dir(ctx, config))
],
)
+ # wait for kafka to start
+ time.sleep(5)
try:
yield
ctx.cluster.only(client).run(
args=['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
- './kafka-server-stop.sh',
- '{tir}/config/kafka.properties'.format(tir=get_kafka_dir(ctx, config)),
+ './kafka-server-stop.sh',
+ '{tdir}/config/kafka1.properties'.format(tdir=get_kafka_dir(ctx, config)),
],
)
+ ctx.cluster.only(client).run(
+ args=['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
+ './kafka-server-stop.sh',
+ '{tdir}/config/kafka2.properties'.format(tdir=get_kafka_dir(ctx, config)),
+ ],
+ )
+
+ # wait for kafka to stop
time.sleep(5)
ctx.cluster.only(client).run(
- args=['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
+ args=['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
+ './zookeeper-server-stop.sh',
+ '{tir}/config/zookeeper1.properties'.format(tir=get_kafka_dir(ctx, config)),
+ ],
+ )
+ ctx.cluster.only(client).run(
+ args=['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
'./zookeeper-server-stop.sh',
- '{tir}/config/zookeeper.properties'.format(tir=get_kafka_dir(ctx, config)),
+ '{tir}/config/zookeeper2.properties'.format(tir=get_kafka_dir(ctx, config)),
],
)
+ # wait for zookeeper to stop
time.sleep(5)
-
ctx.cluster.only(client).run(args=['killall', '-9', 'java'])
@contextlib.contextmanager
-def run_admin_cmds(ctx,config):
+def run_admin_cmds(ctx, config):
"""
Running Kafka Admin commands in order to check the working of producer anf consumer and creation of topic.
"""
ctx.cluster.only(client).run(
args=[
- 'cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
+ 'cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
'./kafka-topics.sh', '--create', '--topic', 'quickstart-events',
- '--bootstrap-server', 'localhost:9092'
+ '--bootstrap-server', 'localhost:9091,localhost:9092',
],
)
'cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
'echo', "First", run.Raw('|'),
'./kafka-console-producer.sh', '--topic', 'quickstart-events',
- '--bootstrap-server', 'localhost:9092'
+ '--bootstrap-server', 'localhost:9091,localhost:9092',
],
)
'cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
'./kafka-console-consumer.sh', '--topic', 'quickstart-events',
'--from-beginning',
- '--bootstrap-server', 'localhost:9092',
- run.Raw('&'), 'exit'
+ '--bootstrap-server', 'localhost:9091,localhost:9092', '--max-messages', '1',
],
)
num_buckets = 0
run_prefix=''.join(random.choice(string.ascii_lowercase) for _ in range(6))
+
+def get_ip():
+ s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ try:
+ # address should not be reachable
+ s.connect(('10.255.255.255', 1))
+ ip = s.getsockname()[0]
+ finally:
+ s.close()
+ return ip
+
+
def gen_bucket_name():
global num_buckets
# Kafka endpoint functions
-kafka_server = 'localhost'
+default_kafka_server = get_ip()
class KafkaReceiver(object):
"""class for receiving and storing messages on a topic from the kafka broker"""
- def __init__(self, topic, security_type, kafka_server='localhost'):
+ def __init__(self, topic_name, security_type, kafka_server):
from kafka import KafkaConsumer
- remaining_retries = 10
+ from kafka.admin import KafkaAdminClient, NewTopic
+ from kafka.errors import TopicAlreadyExistsError
+ self.status = 'init'
port = 9092
if security_type != 'PLAINTEXT':
security_type = 'SSL'
port = 9093
if kafka_server is None:
- endpoint = "localhost" + ":" + str(port)
- elif ":" not in kafka_server:
+ endpoint = default_kafka_server + ":" + str(port)
+ elif ":" not in kafka_server and len(kafka_server.split(",")) == 1:
endpoint = kafka_server + ":" + str(port)
else:
endpoint = kafka_server
+ remaining_retries = 10
while remaining_retries > 0:
try:
- self.consumer = KafkaConsumer(topic,
+ admin_client = KafkaAdminClient(
+ bootstrap_servers=endpoint,
+ request_timeout_ms=16000)
+ topic = NewTopic(name=topic_name, num_partitions=1, replication_factor=1)
+ admin_client.create_topics([topic])
+ log.info('Kafka admin created topic: %s on broker/s: %s', topic_name, endpoint)
+ break
+ except Exception as error:
+ if type(error) == TopicAlreadyExistsError:
+ log.info('Kafka admin topic %s already exists on broker/s: %s', topic_name, endpoint)
+ break
+ remaining_retries -= 1
+ log.warning('Kafka admin failed to create topic: %s on broker/s: %s. remaining reties: %d. error: %s',
+ topic_name, endpoint , remaining_retries, str(error))
+ time.sleep(1)
+
+ if remaining_retries == 0:
+ raise Exception('Kafka admin failed to create topic: %s. no retries left', topic_name)
+
+ remaining_retries = 10
+ while remaining_retries > 0:
+ try:
+ self.consumer = KafkaConsumer(topic_name,
bootstrap_servers=endpoint,
security_protocol=security_type,
- consumer_timeout_ms=16000,
+ metadata_max_age_ms=5000,
+ consumer_timeout_ms=5000,
auto_offset_reset='earliest')
- print('Kafka consumer created on topic: '+topic)
+ log.info('Kafka consumer connected to broker/s: %s for topic: %s', endpoint , topic_name)
+ # This forces the consumer to fetch metadata immediately
+ partitions = self.consumer.partitions_for_topic(topic)
+ log.info('Kafka consumer partitions for topic: %s are: %s', topic_name, str(partitions))
+ self.consumer.poll(timeout_ms=1000, max_records=1)
break
except Exception as error:
remaining_retries -= 1
- print('failed to connect to kafka (remaining retries '
- + str(remaining_retries) + '): ' + str(error))
+ log.warning('Kafka consumer failed to connect to broker/s: %s. for topic: %. remaining reties: %d. error: %s',
+ endpoint, topic_name, remaining_retries, str(error))
time.sleep(1)
if remaining_retries == 0:
- raise Exception('failed to connect to kafka - no retries left')
+ raise Exception('Kafka consumer failed to connect to kafka for topic: %s. no retries left', topic_name)
+ self.status = 'connected'
self.events = []
- self.topic = topic
+ self.topic = topic_name
self.stop = False
def verify_s3_events(self, keys, exact_match=False, deletions=False, expected_sizes={}, etags=[]):
def kafka_receiver_thread_runner(receiver):
"""main thread function for the kafka receiver"""
try:
- log.info('Kafka receiver started')
- print('Kafka receiver started')
+ log.info('Kafka receiver for topic: %s started', receiver.topic)
+ receiver.status = 'running'
while not receiver.stop:
for msg in receiver.consumer:
receiver.events.append(json.loads(msg.value))
time.sleep(0.1)
- log.info('Kafka receiver ended')
- print('Kafka receiver ended')
+ log.info('Kafka receiver for topic: %s ended', receiver.topic)
except Exception as error:
- log.info('Kafka receiver ended unexpectedly: %s', str(error))
- print('Kafka receiver ended unexpectedly: ' + str(error))
+ log.info('Kafka receiver for topic: %s ended unexpectedly. error: %s', receiver.topic, str(error))
+ receiver.status = 'ended'
def create_kafka_receiver_thread(topic, security_type='PLAINTEXT', kafka_brokers=None):
def stop_kafka_receiver(receiver, task):
"""stop the receiver thread and wait for it to finish"""
receiver.stop = True
- task.join(1)
+ task.join(5)
try:
receiver.consumer.unsubscribe()
receiver.consumer.close()
+ log.info('Kafka receiver on topic: %s gracefully stopped', receiver.topic)
except Exception as error:
- log.info('failed to gracefully stop Kafka receiver: %s', str(error))
-
-
-def get_ip():
- s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- try:
- # address should not be reachable
- s.connect(('10.255.255.255', 1))
- ip = s.getsockname()[0]
- finally:
- s.close()
- return ip
+ log.info('Kafka receiver on topic: %s failed to gracefully stop. error: %s', receiver.topic, str(error))
+
+def verify_kafka_receiver(receiver):
+ """test the kafka receiver"""
+ from kafka import KafkaProducer
+ producer = KafkaProducer(bootstrap_servers=receiver.consumer.config['bootstrap_servers'],
+ security_protocol=receiver.consumer.config['security_protocol'])
+ producer.send(receiver.topic, value=json.dumps({'test': 'message'}).encode('utf-8'))
+ producer.flush()
+ events = []
+ remaining_retries = 10
+ while len(events) == 0:
+ log.info('Kafka receiver (in "%s" state) waiting for test event (at: %s). remaining retries: %d',
+ receiver.status, datetime.datetime.now(), remaining_retries)
+ time.sleep(1)
+ events = receiver.get_and_reset_events()
+ remaining_retries -= 1
+ if remaining_retries == 0:
+ raise Exception('kafka receiver on topic: %s did not receive test event in time', receiver.topic)
+ assert_equal(len(events), 1)
+ assert_in('test', events[0])
+ log.info('Kafka receiver on topic: %s tested ok', receiver.topic)
def connection(no_retries=False):
cmd += ['--account-id', account, '--account-root']
arn = f'arn:aws:iam::{account}:user/Superman'
- _, result = admin(cmd, get_config_cluster())
- assert_equal(result, 0)
+ _, rc = admin(cmd, get_config_cluster())
+ assert_equal(rc, 0)
conn = S3Connection(aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
result = admin(['topic', 'list'], get_config_cluster())
else:
result = admin(['topic', 'list', '--tenant', tenant], get_config_cluster())
+ assert_equal(result[1], 0)
parsed_result = json.loads(result[0])
- if assert_len:
- assert_equal(len(parsed_result), assert_len)
+ try:
+ actual_len = len(parsed_result['topics'])
+ except TypeError:
+ actual_len = len(parsed_result)
+ if assert_len and assert_len != actual_len:
+ log.error(parsed_result)
+ assert 'expected %d topics, got %d' % (assert_len, actual_len)
return parsed_result
log.warning('Topic dump:')
for entry in parsed_result:
log.warning(entry)
- assert_equal(actual_number, assert_entries_number)
+ assert 'expected %d entries, got %d' % (assert_entries_number, actual_number)
return parsed_result
result = admin(['notification', 'list', '--bucket', bucket_name], get_config_cluster())
else:
result = admin(['notification', 'list', '--bucket', bucket_name, '--tenant', tenant], get_config_cluster())
+ assert_equal(result[1], 0)
parsed_result = json.loads(result[0])
- if assert_len:
- assert_equal(len(parsed_result['notifications']), assert_len)
+ actual_len = len(parsed_result['notifications'])
+ if assert_len and assert_len != actual_len:
+ log.error(parsed_result)
+ assert 'expected %d notifications, got %d' % (assert_len, actual_len)
return parsed_result
-def get_notification(bucket_name, notification_name, tenant=''):
+def get_notification(bucket_name, notification_name, tenant=''):
if tenant == '':
result = admin(['notification', 'get', '--bucket', bucket_name, '--notification-id', notification_name], get_config_cluster())
else:
secret_key = str(time.time())
uid = UID_PREFIX + str(time.time())
if tenant == '':
- _, result = admin(['user', 'create', '--uid', uid, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"'], get_config_cluster())
+ _, rc = admin(['user', 'create', '--uid', uid, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"'], get_config_cluster())
else:
- _, result = admin(['user', 'create', '--uid', uid, '--tenant', tenant, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"'], get_config_cluster())
- assert_equal(result, 0)
+ _, rc = admin(['user', 'create', '--uid', uid, '--tenant', tenant, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"'], get_config_cluster())
+ assert_equal(rc, 0)
conn = S3Connection(aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
is_secure=False, port=get_config_port(), host=get_config_host(),
conn = connect_random_user(tenant)
# make sure there are no leftover topics
+ delete_all_topics(conn, '', get_config_cluster())
delete_all_topics(conn, tenant, get_config_cluster())
zonegroup = get_config_zonegroup()
conn = connect_random_user(tenant)
# make sure there are no leftover topics
+ delete_all_topics(conn, '', get_config_cluster())
delete_all_topics(conn, tenant, get_config_cluster())
zonegroup = get_config_zonegroup()
result = admin(
['topic', 'get', '--topic', topic_name + '_3', '--tenant', tenant],
get_config_cluster())
+ assert_equal(result[1], 0)
parsed_result = json.loads(result[0])
assert_equal(parsed_result['arn'], topic_arn3)
assert_true(all([x in parsed_result['owner'] for x in matches]))
response, status = s3_notification_conf.set_config()
assert_equal(status/100, 2)
elif endpoint_type == 'kafka':
- # start amqp receiver
+ # start kafka receiver
+ default_kafka_server_and_port = default_kafka_server + ':9092'
+ if kafka_brokers is not None:
+ kafka_brokers = kafka_brokers + ',' + default_kafka_server_and_port
task, receiver = create_kafka_receiver_thread(topic_name, kafka_brokers=kafka_brokers)
task.start()
- endpoint_address = 'kafka://' + kafka_server
+ verify_kafka_receiver(receiver)
+ endpoint_address = 'kafka://' + default_kafka_server_and_port
# without acks from broker
endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker'
if kafka_brokers is not None:
def test_notification_push_kafka_multiple_brokers_override():
""" test pushing kafka s3 notification on master """
conn = connection()
- notification_push('kafka', conn, kafka_brokers='localhost:9092,localhost:19092')
+ notification_push('kafka', conn, kafka_brokers='{host}:9091,{host}:9092'.format(host=default_kafka_server))
@attr('kafka_failover')
def test_notification_push_kafka_multiple_brokers_append():
""" test pushing kafka s3 notification on master """
conn = connection()
- notification_push('kafka', conn, kafka_brokers='localhost:19092')
+ notification_push('kafka', conn, kafka_brokers='{host}:9091'.format(host=default_kafka_server))
@attr('http_test')
# start kafka receiver
task, receiver = create_kafka_receiver_thread(topic_name)
task.start()
+ verify_kafka_receiver(receiver)
endpoint_address = 'kafka://' + host
endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&persistent=true'
else:
# start amqp receiver
task, receiver = create_kafka_receiver_thread(topic_name)
task.start()
+ verify_kafka_receiver(receiver)
endpoint_address = 'kafka://' + host
endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker'
else:
# start kafka receiver
task, receiver = create_kafka_receiver_thread(topic_name)
task.start()
+ verify_kafka_receiver(receiver)
endpoint_address = 'kafka://' + host
endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&persistent=true'
else:
# start kafka receiver
task, receiver = create_kafka_receiver_thread(topic_name)
task.start()
+ verify_kafka_receiver(receiver)
endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&persistent=true'+ \
'&retry_sleep_duration=1'
else:
host = get_ip()
task, receiver = create_kafka_receiver_thread(topic_name)
task.start()
-
+ verify_kafka_receiver(receiver)
# create s3 topic
endpoint_address = 'kafka://WrongHost' # wrong port
# topic stats
result = admin(['topic', 'dump', '--topic', topic_name], get_config_cluster())
assert_equal(result[1], 0)
- print(result[0])
parsed_result = json.loads(result[0])
assert_equal(len(parsed_result), 2*number_of_objects)
task, receiver = create_kafka_receiver_thread(topic_name+'_1')
task.start()
+ verify_kafka_receiver(receiver)
# create s3 topic
- endpoint_address = 'kafka://' + kafka_server
+ endpoint_address = 'kafka://' + default_kafka_server
# with acks from broker
endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker'
topic_conf1 = PSTopicS3(conn, topic_name+'_1', zonegroup, endpoint_args=endpoint_args)
# start kafka receiver
task, receiver = create_kafka_receiver_thread(topic_name_1)
task.start()
+ verify_kafka_receiver(receiver)
endpoint_address = 'kafka://' + host
endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&persistent=true'+ \
'&retry_sleep_duration=1'
# start kafka receiver
task, receiver = create_kafka_receiver_thread(topic_name)
task.start()
+ verify_kafka_receiver(receiver)
endpoint_address = 'kafka://' + host
endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker'+'&persistent=true'
else:
account = 'RGW77777777777777777'
user = UID_PREFIX + 'test'
- _, result = admin(['account', 'create', '--account-id', account, '--account-name', 'testacct'], get_config_cluster())
- assert_true(result in [0, 17]) # EEXIST okay if we rerun
+ _, rc = admin(['account', 'create', '--account-id', account, '--account-name', 'testacct'], get_config_cluster())
+ assert_true(rc in [0, 17]) # EEXIST okay if we rerun
conn, _ = another_user(user=user, account=account)
try:
http_server.close()
-@attr('data_path_v2_test')
+@attr('basic_test')
def test_ps_s3_list_topics_migration():
""" test list topics on migration"""
if get_config_cluster() == 'noname':
tenant_topic_conf.del_config(tenant_topic_arn1)
tenant_topic_conf.del_config(tenant_topic_arn2)
-@attr('data_path_v2_test')
+@attr('basic_test')
def test_ps_s3_list_topics_v1():
""" test list topics on v1"""
if get_config_cluster() == 'noname':
# create s3 topic
if security_type == 'SASL_SSL':
if not use_topic_attrs_for_creds:
- endpoint_address = 'kafka://alice:alice-secret@' + kafka_server + ':9094'
+ endpoint_address = 'kafka://alice:alice-secret@' + default_kafka_server + ':9094'
else:
- endpoint_address = 'kafka://' + kafka_server + ':9094'
+ endpoint_address = 'kafka://' + default_kafka_server + ':9094'
elif security_type == 'SSL':
- endpoint_address = 'kafka://' + kafka_server + ':9093'
+ endpoint_address = 'kafka://' + default_kafka_server + ':9093'
elif security_type == 'SASL_PLAINTEXT':
- endpoint_address = 'kafka://alice:alice-secret@' + kafka_server + ':9095'
+ endpoint_address = 'kafka://alice:alice-secret@' + default_kafka_server + ':9095'
else:
assert False, 'unknown security method '+security_type
# create consumer on the topic
task, receiver = create_kafka_receiver_thread(topic_name)
task.start()
+ verify_kafka_receiver(receiver)
topic_arn = topic_conf.set_config()
# create s3 notification
http_server.close()
+def poll_on_topic(topic_name, tenant=''):
+ remaining_retries = 10
+ start_time = time.time()
+ while True:
+ result = remove_topic(topic_name, tenant, allow_failure=True)
+ time_diff = time.time() - start_time
+ if result == 0:
+ log.info('migration took %d seconds', time_diff)
+ return
+ elif result == 154:
+ if remaining_retries == 0:
+ assert False, 'migration did not end after %d seconds' % time_diff
+ remaining_retries -= 1
+ log.info('migration in process. remaining retries: %d', remaining_retries)
+ time.sleep(2)
+ else:
+ assert False, 'unexpected error (%d) trying to remove topic when waiting for migration to end' % result
+
def persistent_data_path_v2_migration(conn, endpoint_type):
""" test data path v2 persistent migration """
if get_config_cluster() == 'noname':
# start kafka receiver
task, receiver = create_kafka_receiver_thread(topic_name)
task.start()
+ verify_kafka_receiver(receiver)
endpoint_address = 'kafka://' + host
endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&persistent=true'+ \
'&retry_sleep_duration=1'
zonegroup_modify_feature(enable=True, feature_name=zonegroup_feature_notification_v2)
# poll on topic_1
- result = 1
- while result != 0:
- time.sleep(1)
- result = remove_topic(topic_name_1, allow_failure=True)
+ poll_on_topic(topic_name_1)
# topic stats
get_stats_persistent_topic(topic_name, number_of_objects)
receiver.close(task)
-@attr('data_path_v2_test')
+@attr('http_test')
def persistent_data_path_v2_migration_http():
""" test data path v2 persistent migration, http endpoint """
conn = connection()
persistent_data_path_v2_migration(conn, 'http')
-@attr('data_path_v2_kafka_test')
+@attr('kafka_test')
def persistent_data_path_v2_migration_kafka():
""" test data path v2 persistent migration, kafka endpoint """
conn = connection()
persistent_data_path_v2_migration(conn, 'kafka')
-@attr('data_path_v2_test')
+@attr('http_test')
def test_ps_s3_data_path_v2_migration():
""" test data path v2 migration """
if get_config_cluster() == 'noname':
time_diff = time.time() - start_time
print('average time for creation + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
- try:
- # verify events
- keys = list(bucket.list())
- http_server.verify_s3_events(keys, exact_match=True)
+ # verify events
+ keys = list(bucket.list())
+ http_server.verify_s3_events(keys, exact_match=True)
- # create topic to poll on
- topic_name_1 = topic_name + '_1'
- topic_conf_1 = PSTopicS3(conn, topic_name_1, zonegroup, endpoint_args=endpoint_args)
+ # create topic to poll on
+ topic_name_1 = topic_name + '_1'
+ topic_conf_1 = PSTopicS3(conn, topic_name_1, zonegroup, endpoint_args=endpoint_args)
- # enable v2 notification
- zonegroup_modify_feature(enable=True, feature_name=zonegroup_feature_notification_v2)
+ # enable v2 notification
+ zonegroup_modify_feature(enable=True, feature_name=zonegroup_feature_notification_v2)
- # poll on topic_1
- result = 1
- while result != 0:
- time.sleep(1)
- result = remove_topic(topic_name_1, allow_failure=True)
+ # poll on topic_1
+ poll_on_topic(topic_name_1)
- # create more objects in the bucket (async)
- client_threads = []
- start_time = time.time()
- for i in range(number_of_objects):
- key = bucket.new_key('key-'+str(i))
- content = str(os.urandom(1024*1024))
- thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
- thr.start()
- client_threads.append(thr)
- [thr.join() for thr in client_threads]
- time_diff = time.time() - start_time
- print('average time for creation + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+ # create more objects in the bucket (async)
+ client_threads = []
+ start_time = time.time()
+ for i in range(number_of_objects):
+ key = bucket.new_key('key-'+str(i))
+ content = str(os.urandom(1024*1024))
+ thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+ time_diff = time.time() - start_time
+ print('average time for creation + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
- # verify events
- keys = list(bucket.list())
- http_server.verify_s3_events(keys, exact_match=True)
+ # verify events
+ keys = list(bucket.list())
+ http_server.verify_s3_events(keys, exact_match=True)
- except Exception as e:
- assert False, str(e)
- finally:
- # cleanup
- s3_notification_conf.del_config()
- topic_conf.del_config()
- # delete objects from the bucket
- client_threads = []
- for key in bucket.list():
- thr = threading.Thread(target = key.delete, args=())
- thr.start()
- client_threads.append(thr)
- [thr.join() for thr in client_threads]
- # delete the bucket
- conn.delete_bucket(bucket_name)
- http_server.close()
+ # cleanup
+ s3_notification_conf.del_config()
+ topic_conf.del_config()
+ # delete objects from the bucket
+ client_threads = []
+ for key in bucket.list():
+ thr = threading.Thread(target = key.delete, args=())
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+ # delete the bucket
+ conn.delete_bucket(bucket_name)
+ http_server.close()
-@attr('data_path_v2_test')
+@attr('basic_test')
def test_ps_s3_data_path_v2_large_migration():
""" test data path v2 large migration """
if get_config_cluster() == 'noname':
# enable v2 notification
zonegroup_modify_feature(enable=True, feature_name=zonegroup_feature_notification_v2)
- # poll on topic_1
- for tenant, topic_conf in zip(tenants_list, polling_topics_conf):
- while True:
- result = remove_topic(topic_conf.topic_name, tenant, allow_failure=True)
-
- if result != 0:
- print('migration in process... error: '+str(result))
- else:
- break
+ for tenant in tenants_list:
+ list_topics(1, tenant)
- time.sleep(1)
+ # poll on topic
+ for tenant, topic_conf in zip(tenants_list, polling_topics_conf):
+ poll_on_topic(topic_conf.topic_name, tenant)
# check if we migrated all the topics
for tenant in tenants_list:
# check if we migrated all the notifications
for tenant, bucket in zip(tenants_list, buckets_list):
- list_notifications(bucket.name, num_of_s3_notifications)
+ list_notifications(bucket.name, num_of_s3_notifications, tenant)
# cleanup
for s3_notification_conf in s3_notification_conf_list:
conn.delete_bucket(bucket.name)
-@attr('data_path_v2_test')
+@attr('basic_test')
def test_ps_s3_data_path_v2_mixed_migration():
""" test data path v2 mixed migration """
if get_config_cluster() == 'noname':
# enable v2 notification
zonegroup_modify_feature(enable=True, feature_name=zonegroup_feature_notification_v2)
- # poll on topic_1
+ # poll on topic
for tenant, topic_conf in zip(tenants_list, polling_topics_conf):
- while True:
- result = remove_topic(topic_conf.topic_name, tenant, allow_failure=True)
-
- if result != 0:
- print(result)
- else:
- break
-
- time.sleep(1)
+ poll_on_topic(topic_conf.topic_name, tenant)
# check if we migrated all the topics
for tenant in tenants_list:
# check if we migrated all the notifications
for tenant, bucket in zip(tenants_list, buckets_list):
- list_notifications(bucket.name, 2)
+ list_notifications(bucket.name, 2, tenant)
# cleanup
for s3_notification_conf in s3_notification_conf_list:
# start kafka receiver
task, receiver = create_kafka_receiver_thread(topic_name)
task.start()
- incorrect_port = 8080
- endpoint_address = 'kafka://' + kafka_server + ':' + str(incorrect_port)
+ verify_kafka_receiver(receiver)
+ incorrect_port = 9999
+ endpoint_address = 'kafka://' + default_kafka_server + ':' + str(incorrect_port)
endpoint_args = 'push-endpoint=' + endpoint_address + '&kafka-ack-level=broker' + '&persistent=true'
# create s3 topic
assert_equal(parsed_result['Topic Stats']['Entries'], 2 * number_of_objects)
# remove the port and update the topic, so its pointing to correct endpoint.
- endpoint_address = 'kafka://' + kafka_server
+ endpoint_address = 'kafka://' + default_kafka_server
# update s3 topic
topic_conf.set_attributes(attribute_name="push-endpoint",
attribute_val=endpoint_address)
# start kafka receiver
task_1, receiver_1 = create_kafka_receiver_thread(topic_name_1)
task_1.start()
+ verify_kafka_receiver(receiver_1)
task_2, receiver_2 = create_kafka_receiver_thread(topic_name_2)
task_2.start()
- endpoint_address = 'kafka://' + kafka_server
+ verify_kafka_receiver(receiver_2)
+ endpoint_address = 'kafka://' + default_kafka_server
endpoint_args = 'push-endpoint=' + endpoint_address + '&kafka-ack-level=broker&use-ssl=true' + '&persistent=true'
# initially create both s3 topics with `use-ssl=true`
assert_equal(parsed_result['Topic Stats']['Entries'], 2 * number_of_objects)
# remove the ssl from topic1 and update the topic.
- endpoint_address = 'kafka://' + kafka_server
+ endpoint_address = 'kafka://' + default_kafka_server
topic_conf_1.set_attributes(attribute_name="use-ssl",
attribute_val="false")
keys = list(bucket.list())