"""
Deploy and configure Kafka for Teuthology
"""
-from io import BytesIO
-from io import StringIO
-from configobj import ConfigObj
-import base64
-import argparse
import contextlib
import logging
-import os
-import random
-import six
-import string
-import subprocess
-import json
-import sys
-from pathlib import Path
-
-from collections import OrderedDict
-from itertools import chain
from teuthology import misc as teuthology
from teuthology import contextutil
-from teuthology.config import config as teuth_config
from teuthology.orchestra import run
-from teuthology.packaging import install_package
-from teuthology.packaging import remove_package
from teuthology.exceptions import ConfigError
log = logging.getLogger(__name__)
-def get_kafka_dir(ctx):
- return '{tdir}/kafka-2.6.0-src'.format(tdir=teuthology.get_testdir(ctx))
+def get_kafka_version(config):
+ for client, client_config in config.items():
+ if 'kafka_version' in client_config:
+ kafka_version = client_config.get('kafka_version')
+ return kafka_version
-def run_in_kafka_dir(ctx, client, args, **kwargs):
- return ctx.cluster.only(client).run(
- args=[ 'cd', get_kafka_dir(ctx), run.Raw('&&'), ] + args,
- **kwargs
- )
+def get_kafka_dir(ctx, config):
+ kafka_version = get_kafka_version(config)
+ current_version = 'kafka-' + kafka_version + '-src'
+ return '{tdir}/{ver}'.format(tdir=teuthology.get_testdir(ctx),ver=current_version)
def get_toxvenv_dir(ctx):
return ctx.tox.venv_path
for (client, _) in config.items():
(remote,) = ctx.cluster.only(client).remotes.keys()
test_dir=teuthology.get_testdir(ctx)
- toxvenv_sh(ctx, remote, ['cd', '{tdir}'.format(tdir=test_dir), run.Raw('&&'), 'wget', 'https://archive.apache.org/dist/kafka/2.6.0/kafka-2.6.0-src.tgz'])
- toxvenv_sh(ctx, remote, ['cd', '{tdir}'.format(tdir=test_dir), run.Raw('&&'), 'tar', '-xvzf', 'kafka-2.6.0-src.tgz'])
+ current_version = get_kafka_version(config)
+
+ link1 = 'https://archive.apache.org/dist/kafka/' + current_version + '/kafka-' + current_version + '-src.tgz'
+ toxvenv_sh(ctx, remote, ['cd', '{tdir}'.format(tdir=test_dir), run.Raw('&&'), 'wget', link1])
+
+ file1 = 'kafka-' + current_version + '-src.tgz'
+ toxvenv_sh(ctx, remote, ['cd', '{tdir}'.format(tdir=test_dir), run.Raw('&&'), 'tar', '-xvzf', file1])
try:
yield
finally:
log.info('Removing packaged dependencies of Kafka...')
- test_dir=get_kafka_dir(ctx)
+ test_dir=get_kafka_dir(ctx, config)
+ current_version = get_kafka_version(config)
for client in config:
ctx.cluster.only(client).run(
args=['rm', '-rf', test_dir],
)
+ rmfile1 = 'kafka-' + current_version + '-src.tgz'
+ ctx.cluster.only(client).run(
+ args=['rm', '-rf', '{tdir}/{doc}'.format(tdir=teuthology.get_testdir(ctx),doc=rmfile1)],
+ )
+
@contextlib.contextmanager
def run_kafka(ctx,config):
(remote,) = ctx.cluster.only(client).remotes.keys()
toxvenv_sh(ctx, remote,
- ['cd', '{tdir}'.format(tdir=get_kafka_dir(ctx)), run.Raw('&&'),
+ ['cd', '{tdir}'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
'./gradlew', 'jar',
'-PscalaVersion=2.13.2'
],
)
toxvenv_sh(ctx, remote,
- ['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx)), run.Raw('&&'),
+ ['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
'./zookeeper-server-start.sh',
- '{tir}/config/zookeeper.properties'.format(tir=get_kafka_dir(ctx)),
+ '{tir}/config/zookeeper.properties'.format(tir=get_kafka_dir(ctx, config)),
run.Raw('&'), 'exit'
],
)
toxvenv_sh(ctx, remote,
- ['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx)), run.Raw('&&'),
+ ['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
'./kafka-server-start.sh',
- '{tir}/config/server.properties'.format(tir=get_kafka_dir(ctx)),
+ '{tir}/config/server.properties'.format(tir=get_kafka_dir(ctx, config)),
run.Raw('&'), 'exit'
],
)
(remote,) = ctx.cluster.only(client).remotes.keys()
toxvenv_sh(ctx, remote,
- ['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx)), run.Raw('&&'),
+ ['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
'./kafka-server-stop.sh',
- '{tir}/config/kafka.properties'.format(tir=get_kafka_dir(ctx)),
+ '{tir}/config/kafka.properties'.format(tir=get_kafka_dir(ctx, config)),
]
)
toxvenv_sh(ctx, remote,
- ['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx)), run.Raw('&&'),
+ ['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
'./zookeeper-server-stop.sh',
- '{tir}/config/zookeeper.properties'.format(tir=get_kafka_dir(ctx)),
+ '{tir}/config/zookeeper.properties'.format(tir=get_kafka_dir(ctx, config)),
]
)
toxvenv_sh(ctx, remote,
[
- 'cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx)), run.Raw('&&'),
+ 'cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
'./kafka-topics.sh', '--create', '--topic', 'quickstart-events',
'--bootstrap-server', 'localhost:9092'
])
toxvenv_sh(ctx, remote,
[
- 'cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx)), run.Raw('&&'),
+ 'cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
'echo', "First", run.Raw('|'),
'./kafka-console-producer.sh', '--topic', 'quickstart-events',
'--bootstrap-server', 'localhost:9092'
toxvenv_sh(ctx, remote,
[
- 'cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx)), run.Raw('&&'),
+ 'cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
'./kafka-console-consumer.sh', '--topic', 'quickstart-events',
'--from-beginning',
'--bootstrap-server', 'localhost:9092',
config = all_clients
if isinstance(config, list):
config = dict.fromkeys(config)
- clients=config.keys()
log.debug('Kafka config is %s', config)