[&Attributes.entry.13.key=max_retries&Attributes.entry.13.value=<retries number>]
[&Attributes.entry.14.key=retry_sleep_duration&Attributes.entry.14.value=<sleep seconds>]
[&Attributes.entry.15.key=Policy&Attributes.entry.15.value=<policy-JSON-string>]
+ [&Attributes.entry.18.key=kafka-brokers&Attributes.entry.18.value=<kafka-broker-list>]
Request parameters:
- "broker": Messages are considered "delivered" if acked by the broker. (This
is the default.)
+ - kafka-brokers: A command-separated list of host:port of kafka brokers. These brokers (may contain a broker which is defined in kafka uri) will be added to kafka uri to support sending notifcations to a kafka cluster.
+
.. note::
- The key-value pair of a specific parameter need not reside in the same
--- /dev/null
+tasks:
+- install:
+- ceph:
+- openssl_keys:
+- rgw:
+ client.0:
+
+overrides:
+ install:
+ ceph:
+ extra_system_packages:
+ rpm:
+ - java
+ deb:
+ - default-jre
+ ceph:
+ conf:
+ global:
+ osd_min_pg_log_entries: 10
+ osd_max_pg_log_entries: 10
--- /dev/null
+../../.qa/distros/supported-random-distro$/
\ No newline at end of file
--- /dev/null
+tasks:
+- kafka-failover:
+ client.0:
+ kafka_version: 3.8.1
+- notification-tests:
+ client.0:
+ extra_attr: ["kafka_failover"]
+ rgw_server: client.0
--- /dev/null
+"""
+Deploy and configure Kafka for Teuthology
+"""
+import contextlib
+import logging
+import time
+import os
+
+from teuthology import misc as teuthology
+from teuthology import contextutil
+from teuthology.orchestra import run
+
+log = logging.getLogger(__name__)
+
+def get_kafka_version(config):
+ for client, client_config in config.items():
+ if 'kafka_version' in client_config:
+ kafka_version = client_config.get('kafka_version')
+ return kafka_version
+
+kafka_prefix = 'kafka_2.13-'
+
+def get_kafka_dir(ctx, config):
+ kafka_version = get_kafka_version(config)
+ current_version = kafka_prefix + kafka_version
+ return '{tdir}/{ver}'.format(tdir=teuthology.get_testdir(ctx),ver=current_version)
+
+
+@contextlib.contextmanager
+def install_kafka(ctx, config):
+ """
+ Downloading the kafka tar file.
+ """
+ assert isinstance(config, dict)
+ log.info('Installing Kafka...')
+
+ # programmatically find a nearby mirror so as not to hammer archive.apache.org
+ apache_mirror_cmd="curl 'https://www.apache.org/dyn/closer.cgi' 2>/dev/null | " \
+ "grep -o '<strong>[^<]*</strong>' | sed 's/<[^>]*>//g' | head -n 1"
+ log.info("determining apache mirror by running: " + apache_mirror_cmd)
+ apache_mirror_url_front = os.popen(apache_mirror_cmd).read().rstrip() # note: includes trailing slash (/)
+ log.info("chosen apache mirror is " + apache_mirror_url_front)
+
+ for (client, _) in config.items():
+ (remote,) = ctx.cluster.only(client).remotes.keys()
+ test_dir=teuthology.get_testdir(ctx)
+ current_version = get_kafka_version(config)
+
+ kafka_file = kafka_prefix + current_version + '.tgz'
+
+ link1 = '{apache_mirror_url_front}/kafka/'.format(apache_mirror_url_front=apache_mirror_url_front) + \
+ current_version + '/' + kafka_file
+ ctx.cluster.only(client).run(
+ args=['cd', '{tdir}'.format(tdir=test_dir), run.Raw('&&'), 'wget', link1],
+ )
+
+ ctx.cluster.only(client).run(
+ args=['cd', '{tdir}'.format(tdir=test_dir), run.Raw('&&'), 'tar', '-xvzf', kafka_file],
+ )
+
+ kafka_dir = get_kafka_dir(ctx, config)
+ # create config for second broker
+ second_broker_config_name = "server2.properties"
+ second_broker_data = "{tdir}/data/broker02".format(tdir=kafka_dir)
+ second_broker_data_logs_escaped = "{}/logs".format(second_broker_data).replace("/", "\/")
+
+ ctx.cluster.only(client).run(
+ args=['cd', '{tdir}'.format(tdir=kafka_dir), run.Raw('&&'),
+ 'cp', '{tdir}/config/server.properties'.format(tdir=kafka_dir), '{tdir}/config/{second_broker_config_name}'.format(tdir=kafka_dir, second_broker_config_name=second_broker_config_name), run.Raw('&&'),
+ 'mkdir', '-p', '{tdir}/data'.format(tdir=kafka_dir)
+ ],
+ )
+
+ # edit config
+ ctx.cluster.only(client).run(
+ args=['sed', '-i', 's/broker.id=0/broker.id=1/g', '{tdir}/config/{second_broker_config_name}'.format(tdir=kafka_dir, second_broker_config_name=second_broker_config_name), run.Raw('&&'),
+ 'sed', '-i', 's/#listeners=PLAINTEXT:\/\/:9092/listeners=PLAINTEXT:\/\/localhost:19092/g', '{tdir}/config/{second_broker_config_name}'.format(tdir=kafka_dir, second_broker_config_name=second_broker_config_name), run.Raw('&&'),
+ 'sed', '-i', 's/#advertised.listeners=PLAINTEXT:\/\/your.host.name:9092/advertised.listeners=PLAINTEXT:\/\/localhost:19092/g', '{tdir}/config/{second_broker_config_name}'.format(tdir=kafka_dir, second_broker_config_name=second_broker_config_name), run.Raw('&&'),
+ 'sed', '-i', 's/log.dirs=\/tmp\/kafka-logs/log.dirs={}/g'.format(second_broker_data_logs_escaped), '{tdir}/config/{second_broker_config_name}'.format(tdir=kafka_dir, second_broker_config_name=second_broker_config_name), run.Raw('&&'),
+ 'cat', '{tdir}/config/{second_broker_config_name}'.format(tdir=kafka_dir, second_broker_config_name=second_broker_config_name)
+ ]
+ )
+
+ try:
+ yield
+ finally:
+ log.info('Removing packaged dependencies of Kafka...')
+ test_dir=get_kafka_dir(ctx, config)
+ current_version = get_kafka_version(config)
+ for (client,_) in config.items():
+ ctx.cluster.only(client).run(
+ args=['rm', '-rf', '{tdir}/logs'.format(tdir=test_dir)],
+ )
+
+ ctx.cluster.only(client).run(
+ args=['rm', '-rf', test_dir],
+ )
+
+ ctx.cluster.only(client).run(
+ args=['rm', '-rf', '{tdir}/{doc}'.format(tdir=teuthology.get_testdir(ctx),doc=kafka_file)],
+ )
+
+
+@contextlib.contextmanager
+def run_kafka(ctx,config):
+ """
+ This includes two parts:
+ 1. Starting Zookeeper service
+ 2. Starting Kafka service
+ """
+ assert isinstance(config, dict)
+ log.info('Bringing up Zookeeper and Kafka services...')
+ for (client,_) in config.items():
+ (remote,) = ctx.cluster.only(client).remotes.keys()
+ kafka_dir = get_kafka_dir(ctx, config)
+
+ second_broker_data = "{tdir}/data/broker02".format(tdir=kafka_dir)
+ second_broker_java_log_dir = "{}/java_logs".format(second_broker_data)
+
+ ctx.cluster.only(client).run(
+ args=['cd', '{tdir}/bin'.format(tdir=kafka_dir), run.Raw('&&'),
+ './zookeeper-server-start.sh',
+ '{tir}/config/zookeeper.properties'.format(tir=kafka_dir),
+ run.Raw('&'), 'exit'
+ ],
+ )
+
+ ctx.cluster.only(client).run(
+ args=['cd', '{tdir}/bin'.format(tdir=kafka_dir), run.Raw('&&'),
+ './kafka-server-start.sh',
+ '{tir}/config/server.properties'.format(tir=get_kafka_dir(ctx, config)),
+ run.Raw('&'), 'exit'
+ ],
+ )
+
+ ctx.cluster.only(client).run(
+ args=['cd', '{tdir}/bin'.format(tdir=kafka_dir), run.Raw('&&'),
+ run.Raw('LOG_DIR={second_broker_java_log_dir}'.format(second_broker_java_log_dir=second_broker_java_log_dir)),
+ './kafka-server-start.sh', '{tdir}/config/server2.properties'.format(tdir=kafka_dir),
+ run.Raw('&'), 'exit'
+ ],
+ )
+
+ try:
+ yield
+ finally:
+ log.info('Stopping Zookeeper and Kafka Services...')
+
+ for (client, _) in config.items():
+ (remote,) = ctx.cluster.only(client).remotes.keys()
+
+ ctx.cluster.only(client).run(
+ args=['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
+ './kafka-server-stop.sh',
+ '{tir}/config/kafka.properties'.format(tir=get_kafka_dir(ctx, config)),
+ ],
+ )
+
+ time.sleep(5)
+
+ ctx.cluster.only(client).run(
+ args=['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
+ './zookeeper-server-stop.sh',
+ '{tir}/config/zookeeper.properties'.format(tir=get_kafka_dir(ctx, config)),
+ ],
+ )
+
+ time.sleep(5)
+
+ ctx.cluster.only(client).run(args=['killall', '-9', 'java'])
+
+
+@contextlib.contextmanager
+def run_admin_cmds(ctx,config):
+ """
+ Running Kafka Admin commands in order to check the working of producer anf consumer and creation of topic.
+ """
+ assert isinstance(config, dict)
+ log.info('Checking kafka server through producer/consumer commands...')
+ for (client,_) in config.items():
+ (remote,) = ctx.cluster.only(client).remotes.keys()
+
+ ctx.cluster.only(client).run(
+ args=[
+ 'cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
+ './kafka-topics.sh', '--create', '--topic', 'quickstart-events',
+ '--bootstrap-server', 'localhost:9092'
+ ],
+ )
+
+ ctx.cluster.only(client).run(
+ args=[
+ 'cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
+ 'echo', "First", run.Raw('|'),
+ './kafka-console-producer.sh', '--topic', 'quickstart-events',
+ '--bootstrap-server', 'localhost:9092'
+ ],
+ )
+
+ ctx.cluster.only(client).run(
+ args=[
+ 'cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
+ './kafka-console-consumer.sh', '--topic', 'quickstart-events',
+ '--from-beginning',
+ '--bootstrap-server', 'localhost:9092',
+ run.Raw('&'), 'exit'
+ ],
+ )
+
+ try:
+ yield
+ finally:
+ pass
+
+
+@contextlib.contextmanager
+def task(ctx,config):
+ """
+ Following is the way how to run kafka::
+ tasks:
+ - kafka:
+ client.0:
+ kafka_version: 2.6.0
+ """
+ assert config is None or isinstance(config, list) \
+ or isinstance(config, dict), \
+ "task kafka only supports a list or dictionary for configuration"
+
+ all_clients = ['client.{id}'.format(id=id_)
+ for id_ in teuthology.all_roles_of_type(ctx.cluster, 'client')]
+ if config is None:
+ config = all_clients
+ if isinstance(config, list):
+ config = dict.fromkeys(config)
+
+ log.debug('Kafka config is %s', config)
+
+ with contextutil.nested(
+ lambda: install_kafka(ctx=ctx, config=config),
+ lambda: run_kafka(ctx=ctx, config=config),
+ lambda: run_admin_cmds(ctx=ctx, config=config),
+ ):
+ yield
+
for client, client_config in config.items():
(remote,) = ctx.cluster.only(client).remotes.keys()
- attr = ["!kafka_test", "!amqp_test", "!amqp_ssl_test", "!kafka_ssl_test", "!modification_required", "!manual_test"]
+ attr = ["!kafka_test", "!kafka_failover", "!amqp_test", "!amqp_ssl_test", "!kafka_ssl_test", "!modification_required", "!manual_test"]
if 'extra_attr' in client_config:
attr = client_config.get('extra_attr')
cct(_cct),
topic(_topic),
ack_level(get_ack_level(args)) {
- if (!kafka::connect(conn_name, _endpoint, get_bool(args, "use-ssl", false), get_bool(args, "verify-ssl", true),
- args.get_optional("ca-location"), args.get_optional("mechanism"))) {
+ if (!kafka::connect(conn_name, _endpoint, get_bool(args, "use-ssl", false), get_bool(args, "verify-ssl", true),
+ args.get_optional("ca-location"), args.get_optional("mechanism"),
+ args.get_optional("kafka-brokers"))) {
throw configuration_error("Kafka: failed to create connection to: " + _endpoint);
- }
- }
+ }
+ }
int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_event& event, optional_yield y) override {
if (ack_level == ack_level_t::None) {
#include <thread>
#include <atomic>
#include <mutex>
+#include <boost/algorithm/string.hpp>
+#include <boost/functional/hash.hpp>
#include <boost/lockfree/queue.hpp>
#include "common/dout.h"
}
// connect to a broker, or reuse an existing connection if already connected
- bool connect(std::string& broker,
- const std::string& url,
- bool use_ssl,
- bool verify_ssl,
- boost::optional<const std::string&> ca_location,
- boost::optional<const std::string&> mechanism) {
+ bool connect(std::string& broker_list,
+ const std::string& url,
+ bool use_ssl,
+ bool verify_ssl,
+ boost::optional<const std::string&> ca_location,
+ boost::optional<const std::string&> mechanism,
+ boost::optional<const std::string&> brokers) {
if (stopped) {
ldout(cct, 1) << "Kafka connect: manager is stopped" << dendl;
return false;
std::string user;
std::string password;
- if (!parse_url_authority(url, broker, user, password)) {
+
+ if (!parse_url_authority(url, broker_list, user, password)) {
// TODO: increment counter
ldout(cct, 1) << "Kafka connect: URL parsing failed" << dendl;
return false;
return false;
}
+ if (brokers.has_value()) {
+ broker_list.append(",");
+ broker_list.append(brokers.get());
+ }
+
std::lock_guard lock(connections_lock);
- const auto it = connections.find(broker);
+ const auto it = connections.find(broker_list);
// note that ssl vs. non-ssl connection to the same host are two separate conenctions
if (it != connections.end()) {
// connection found - return even if non-ok
// in such a case the creation will be retried in the main thread
++connection_count;
ldout(cct, 10) << "Kafka connect: new connection is created. Total connections: " << connection_count << dendl;
- auto conn = connections.emplace(broker, std::make_unique<connection_t>(cct, broker, use_ssl, verify_ssl, ca_location, user, password, mechanism)).first->second.get();
+ auto conn = connections.emplace(broker_list, std::make_unique<connection_t>(cct, broker_list, use_ssl, verify_ssl, ca_location, user, password, mechanism)).first->second.get();
if (!new_producer(conn)) {
ldout(cct, 10) << "Kafka connect: new connection is created. But producer creation failed. will retry" << dendl;
}
s_manager = nullptr;
}
-bool connect(std::string& broker, const std::string& url, bool use_ssl, bool verify_ssl,
- boost::optional<const std::string&> ca_location,
- boost::optional<const std::string&> mechanism) {
+bool connect(std::string& broker_list,
+ const std::string& url,
+ bool use_ssl,
+ bool verify_ssl,
+ boost::optional<const std::string&> ca_location,
+ boost::optional<const std::string&> mechanism,
+ boost::optional<const std::string&> brokers) {
if (!s_manager) return false;
- return s_manager->connect(broker, url, use_ssl, verify_ssl, ca_location, mechanism);
+ return s_manager->connect(broker_list, url, use_ssl, verify_ssl, ca_location,
+ mechanism, brokers);
}
int publish(const std::string& conn_name,
void shutdown();
// connect to a kafka endpoint
-bool connect(std::string& broker, const std::string& url, bool use_ssl, bool verify_ssl, boost::optional<const std::string&> ca_location, boost::optional<const std::string&> mechanism);
+bool connect(std::string& broker_list,
+ const std::string& url,
+ bool use_ssl,
+ bool verify_ssl,
+ boost::optional<const std::string&> ca_location,
+ boost::optional<const std::string&> mechanism,
+ boost::optional<const std::string&> brokers);
// publish a message over a connection that was already created
int publish(const std::string& conn_name,
-nose >=1.0.0
+nose-py3 >=1.0.0
boto >=2.6.0
boto3 >=1.0.0
configparser >=5.0.0
class KafkaReceiver(object):
"""class for receiving and storing messages on a topic from the kafka broker"""
- def __init__(self, topic, security_type):
+ def __init__(self, topic, security_type, kafka_server='localhost'):
from kafka import KafkaConsumer
remaining_retries = 10
port = 9092
if security_type != 'PLAINTEXT':
security_type = 'SSL'
port = 9093
+
+ if kafka_server is None:
+ endpoint = "localhost" + ":" + str(port)
+ elif ":" not in kafka_server:
+ endpoint = kafka_server + ":" + str(port)
+ else:
+ endpoint = kafka_server
+
while remaining_retries > 0:
try:
- self.consumer = KafkaConsumer(topic,
- bootstrap_servers = kafka_server+':'+str(port),
+ self.consumer = KafkaConsumer(topic,
+ bootstrap_servers=endpoint,
security_protocol=security_type,
consumer_timeout_ms=16000)
print('Kafka consumer created on topic: '+topic)
print('Kafka receiver ended unexpectedly: ' + str(error))
-def create_kafka_receiver_thread(topic, security_type='PLAINTEXT'):
+def create_kafka_receiver_thread(topic, security_type='PLAINTEXT', kafka_brokers=None):
"""create kafka receiver and thread"""
- receiver = KafkaReceiver(topic, security_type)
+ receiver = KafkaReceiver(topic, security_type, kafka_server=kafka_brokers)
task = threading.Thread(target=kafka_receiver_thread_runner, args=(receiver,))
task.daemon = True
return task, receiver
conn.delete_bucket(bucket_name)
+def notification_push(endpoint_type, conn, account=None, cloudevents=False, kafka_brokers=None):
+ """ test pushinging notification """
+ zonegroup = get_config_zonegroup()
+ # create bucket
+ bucket_name = gen_bucket_name()
+ bucket = conn.create_bucket(bucket_name)
+ topic_name = bucket_name + TOPIC_SUFFIX
+
+ host = get_ip()
+ task = None
+ if endpoint_type == 'http':
+ # create random port for the http server
+ host = get_ip()
+ port = random.randint(10000, 20000)
+ # start an http server in a separate thread
+ receiver = HTTPServerWithEvents((host, port), cloudevents=cloudevents)
+ endpoint_address = 'http://'+host+':'+str(port)
+ if cloudevents:
+ endpoint_args = 'push-endpoint='+endpoint_address+'&cloudevents=true'
+ else:
+ endpoint_args = 'push-endpoint='+endpoint_address
+ topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
+ topic_arn = topic_conf.set_config()
+ # create s3 notification
+ notification_name = bucket_name + NOTIFICATION_SUFFIX
+ topic_conf_list = [{'Id': notification_name,
+ 'TopicArn': topic_arn,
+ 'Events': []
+ }]
+ s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
+ response, status = s3_notification_conf.set_config()
+ assert_equal(status/100, 2)
+ elif endpoint_type == 'amqp':
+ # start amqp receiver
+ exchange = 'ex1'
+ task, receiver = create_amqp_receiver_thread(exchange, topic_name)
+ task.start()
+ endpoint_address = 'amqp://' + host
+ # with acks from broker
+ exchange = 'ex1'
+ endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker'
+ # create two s3 topic
+ topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
+ topic_arn = topic_conf.set_config()
+ # create s3 notification
+ notification_name = bucket_name + NOTIFICATION_SUFFIX
+ topic_conf_list = [{'Id': notification_name,
+ 'TopicArn': topic_arn,
+ 'Events': []
+ }]
+ s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
+ response, status = s3_notification_conf.set_config()
+ assert_equal(status/100, 2)
+ elif endpoint_type == 'kafka':
+ # start amqp receiver
+ task, receiver = create_kafka_receiver_thread(topic_name, kafka_brokers=kafka_brokers)
+ task.start()
+ endpoint_address = 'kafka://' + kafka_server
+ # without acks from broker
+ endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker'
+ if kafka_brokers is not None:
+ endpoint_args += '&kafka-brokers=' + kafka_brokers
+ # create s3 topic
+ topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
+ topic_arn = topic_conf.set_config()
+ # create s3 notification
+ notification_name = bucket_name + NOTIFICATION_SUFFIX
+ topic_conf_list = [{'Id': notification_name,
+ 'TopicArn': topic_arn,
+ 'Events': []
+ }]
+ s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
+ response, status = s3_notification_conf.set_config()
+ assert_equal(status/100, 2)
+ else:
+ return SkipTest('Unknown endpoint type: ' + endpoint_type)
+
+ # create objects in the bucket
+ number_of_objects = 100
+ if cloudevents:
+ number_of_objects = 10
+ client_threads = []
+ etags = []
+ objects_size = {}
+ start_time = time.time()
+ for i in range(number_of_objects):
+ content = str(os.urandom(1024*1024))
+ etag = hashlib.md5(content.encode()).hexdigest()
+ etags.append(etag)
+ object_size = len(content)
+ key = bucket.new_key(str(i))
+ objects_size[key.name] = object_size
+ thr = threading.Thread(target=set_contents_from_string, args=(key, content,))
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+
+ time_diff = time.time() - start_time
+ print('average time for creation + ' + endpoint_type + ' notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+
+ print('wait for 5sec for the messages...')
+ time.sleep(5)
+
+ # check receiver
+ keys = list(bucket.list())
+ receiver.verify_s3_events(keys, exact_match=True, deletions=False, expected_sizes=objects_size, etags=etags)
+
+ # delete objects from the bucket
+ client_threads = []
+ start_time = time.time()
+ for key in bucket.list():
+ thr = threading.Thread(target=key.delete, args=())
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+
+ time_diff = time.time() - start_time
+ print('average time for deletion + ' + endpoint_type + ' notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+
+ print('wait for 5sec for the messages...')
+ time.sleep(5)
+
+ # check receiver
+ receiver.verify_s3_events(keys, exact_match=True, deletions=True, expected_sizes=objects_size, etags=etags)
+
+ # cleanup
+ s3_notification_conf.del_config()
+ topic_conf.del_config()
+ # delete the bucket
+ conn.delete_bucket(bucket_name)
+ receiver.close(task)
+
+
@attr('manual_test')
def test_ps_s3_notification_push_amqp_idleness_check():
""" test pushing amqp s3 notification and checking for connection idleness """
stop_kafka_receiver(receiver, task)
+@attr('kafka_failover')
+def test_notification_push_kafka_multiple_brokers_override():
+ """ test pushing kafka s3 notification on master """
+ conn = connection()
+ notification_push('kafka', conn, kafka_brokers='localhost:9092,localhost:19092')
+
+
+@attr('kafka_failover')
+def test_notification_push_kafka_multiple_brokers_append():
+ """ test pushing kafka s3 notification on master """
+ conn = connection()
+ notification_push('kafka', conn, kafka_brokers='localhost:19092')
+
+
@attr('http_test')
def test_ps_s3_notification_multi_delete_on_master():
""" test deletion of multiple keys on master """