]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw: allow bucket notification send message to kafka with multiple brokers
authorHoai-Thu Vuong <thuvh87@gmail.com>
Tue, 9 Jul 2024 16:56:51 +0000 (23:56 +0700)
committerHoai-Thu Vuong <thuvh87@gmail.com>
Thu, 26 Dec 2024 03:33:54 +0000 (10:33 +0700)
- add new parameter to define list of broker
- update document
- change nose to nose-py3
- add test case for multiple brokers (happy case)

Signed-off-by: Hoai-Thu Vuong <thuvh87@gmail.com>
12 files changed:
doc/radosgw/notifications.rst
qa/suites/rgw/notifications/tasks/kafka_failover/+ [new file with mode: 0644]
qa/suites/rgw/notifications/tasks/kafka_failover/0-install.yaml [new file with mode: 0644]
qa/suites/rgw/notifications/tasks/kafka_failover/supported-distros [new symlink]
qa/suites/rgw/notifications/tasks/kafka_failover/test_kafka.yaml [new file with mode: 0644]
qa/tasks/kafka_failover.py [new file with mode: 0644]
qa/tasks/notification_tests.py
src/rgw/driver/rados/rgw_pubsub_push.cc
src/rgw/rgw_kafka.cc
src/rgw/rgw_kafka.h
src/test/rgw/bucket_notification/requirements.txt
src/test/rgw/bucket_notification/test_bn.py

index 05653956be1d7a9e4fb7c0ad6e201228401fc3da..897c280facf180148081aaa9a37caf93ee96d8f3 100644 (file)
@@ -188,6 +188,7 @@ updating, use the name of an existing topic and different endpoint values).
    [&Attributes.entry.15.key=Policy&Attributes.entry.15.value=<policy-JSON-string>]
    [&Attributes.entry.16.key=user-name&Attributes.entry.16.value=<user-name-string>]
    [&Attributes.entry.17.key=password&Attributes.entry.17.value=<password-string>]
+   [&Attributes.entry.18.key=kafka-brokers&Attributes.entry.18.value=<kafka-broker-list>]
 
 Request parameters:
 
@@ -296,6 +297,8 @@ Request parameters:
   - "broker": Messages are considered "delivered" if acked by the broker. (This
     is the default.)
 
+ - kafka-brokers: A command-separated list of host:port of kafka brokers. These brokers (may contain a broker which is defined in kafka uri) will be added to kafka uri to support sending notifcations to a kafka cluster.
+
 .. note::
 
     - The key-value pair of a specific parameter need not reside in the same
@@ -571,6 +574,7 @@ Valid AttributeName that can be passed:
   - mechanism: may be provided together with user/password (default: ``PLAIN``).
   - kafka-ack-level: No end2end acknowledgement is required. Messages may persist in the
     broker before being delivered to their final destinations. 
+  - kafka-brokers: Set endpoint with broker(s) as a comma-separated list of host or host:port (default port 9092).
 
 Notifications
 ~~~~~~~~~~~~~
diff --git a/qa/suites/rgw/notifications/tasks/kafka_failover/+ b/qa/suites/rgw/notifications/tasks/kafka_failover/+
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/qa/suites/rgw/notifications/tasks/kafka_failover/0-install.yaml b/qa/suites/rgw/notifications/tasks/kafka_failover/0-install.yaml
new file mode 100644 (file)
index 0000000..5c83d5c
--- /dev/null
@@ -0,0 +1,20 @@
+tasks:
+- install:
+- ceph:
+- openssl_keys:
+- rgw:
+    client.0:
+
+overrides:
+  install:
+    ceph:
+      extra_system_packages:
+        rpm:
+        - java
+        deb:
+        - default-jre
+  ceph:
+    conf:
+      global:
+        osd_min_pg_log_entries: 10
+        osd_max_pg_log_entries: 10
diff --git a/qa/suites/rgw/notifications/tasks/kafka_failover/supported-distros b/qa/suites/rgw/notifications/tasks/kafka_failover/supported-distros
new file mode 120000 (symlink)
index 0000000..46280a4
--- /dev/null
@@ -0,0 +1 @@
+../../.qa/distros/supported-random-distro$/
\ No newline at end of file
diff --git a/qa/suites/rgw/notifications/tasks/kafka_failover/test_kafka.yaml b/qa/suites/rgw/notifications/tasks/kafka_failover/test_kafka.yaml
new file mode 100644 (file)
index 0000000..01d6fc6
--- /dev/null
@@ -0,0 +1,8 @@
+tasks:
+- kafka-failover:
+    client.0:
+      kafka_version: 3.8.1
+- notification-tests:
+    client.0:
+      extra_attr: ["kafka_failover"]
+      rgw_server: client.0
diff --git a/qa/tasks/kafka_failover.py b/qa/tasks/kafka_failover.py
new file mode 100644 (file)
index 0000000..3ca60ab
--- /dev/null
@@ -0,0 +1,244 @@
+"""
+Deploy and configure Kafka for Teuthology
+"""
+import contextlib
+import logging
+import time
+import os
+
+from teuthology import misc as teuthology
+from teuthology import contextutil
+from teuthology.orchestra import run
+
+log = logging.getLogger(__name__)
+
+def get_kafka_version(config):
+    for client, client_config in config.items():
+        if 'kafka_version' in client_config:
+            kafka_version = client_config.get('kafka_version')
+    return kafka_version
+
+kafka_prefix = 'kafka_2.13-'
+
+def get_kafka_dir(ctx, config):
+    kafka_version = get_kafka_version(config)
+    current_version = kafka_prefix + kafka_version
+    return '{tdir}/{ver}'.format(tdir=teuthology.get_testdir(ctx),ver=current_version)
+
+
+@contextlib.contextmanager
+def install_kafka(ctx, config):
+    """
+    Downloading the kafka tar file.
+    """
+    assert isinstance(config, dict)
+    log.info('Installing Kafka...')
+
+    # programmatically find a nearby mirror so as not to hammer archive.apache.org
+    apache_mirror_cmd="curl 'https://www.apache.org/dyn/closer.cgi' 2>/dev/null | " \
+        "grep -o '<strong>[^<]*</strong>' | sed 's/<[^>]*>//g' | head -n 1"
+    log.info("determining apache mirror by running: " + apache_mirror_cmd)
+    apache_mirror_url_front = os.popen(apache_mirror_cmd).read().rstrip() # note: includes trailing slash (/)
+    log.info("chosen apache mirror is " + apache_mirror_url_front)
+
+    for (client, _) in config.items():
+        (remote,) = ctx.cluster.only(client).remotes.keys()
+        test_dir=teuthology.get_testdir(ctx)
+        current_version = get_kafka_version(config)
+
+        kafka_file =  kafka_prefix + current_version + '.tgz'
+
+        link1 = '{apache_mirror_url_front}/kafka/'.format(apache_mirror_url_front=apache_mirror_url_front) + \
+            current_version + '/' + kafka_file
+        ctx.cluster.only(client).run(
+            args=['cd', '{tdir}'.format(tdir=test_dir), run.Raw('&&'), 'wget', link1],
+        )
+
+        ctx.cluster.only(client).run(
+            args=['cd', '{tdir}'.format(tdir=test_dir), run.Raw('&&'), 'tar', '-xvzf', kafka_file],
+        )
+
+        kafka_dir = get_kafka_dir(ctx, config)
+        # create config for second broker
+        second_broker_config_name = "server2.properties"
+        second_broker_data = "{tdir}/data/broker02".format(tdir=kafka_dir)
+        second_broker_data_logs_escaped = "{}/logs".format(second_broker_data).replace("/", "\/")
+
+        ctx.cluster.only(client).run(
+            args=['cd', '{tdir}'.format(tdir=kafka_dir), run.Raw('&&'), 
+             'cp', '{tdir}/config/server.properties'.format(tdir=kafka_dir), '{tdir}/config/{second_broker_config_name}'.format(tdir=kafka_dir, second_broker_config_name=second_broker_config_name), run.Raw('&&'), 
+             'mkdir', '-p', '{tdir}/data'.format(tdir=kafka_dir)
+            ],
+        )
+
+        # edit config
+        ctx.cluster.only(client).run(
+            args=['sed', '-i', 's/broker.id=0/broker.id=1/g', '{tdir}/config/{second_broker_config_name}'.format(tdir=kafka_dir, second_broker_config_name=second_broker_config_name), run.Raw('&&'),
+                  'sed', '-i', 's/#listeners=PLAINTEXT:\/\/:9092/listeners=PLAINTEXT:\/\/localhost:19092/g', '{tdir}/config/{second_broker_config_name}'.format(tdir=kafka_dir, second_broker_config_name=second_broker_config_name), run.Raw('&&'),
+                  'sed', '-i', 's/#advertised.listeners=PLAINTEXT:\/\/your.host.name:9092/advertised.listeners=PLAINTEXT:\/\/localhost:19092/g', '{tdir}/config/{second_broker_config_name}'.format(tdir=kafka_dir, second_broker_config_name=second_broker_config_name), run.Raw('&&'),
+                  'sed', '-i', 's/log.dirs=\/tmp\/kafka-logs/log.dirs={}/g'.format(second_broker_data_logs_escaped), '{tdir}/config/{second_broker_config_name}'.format(tdir=kafka_dir, second_broker_config_name=second_broker_config_name), run.Raw('&&'),
+                  'cat', '{tdir}/config/{second_broker_config_name}'.format(tdir=kafka_dir, second_broker_config_name=second_broker_config_name)
+            ]
+        )
+
+    try:
+        yield
+    finally:
+        log.info('Removing packaged dependencies of Kafka...')
+        test_dir=get_kafka_dir(ctx, config)
+        current_version = get_kafka_version(config)
+        for (client,_) in config.items():
+            ctx.cluster.only(client).run(
+                args=['rm', '-rf', '{tdir}/logs'.format(tdir=test_dir)],
+            )
+
+            ctx.cluster.only(client).run(
+                args=['rm', '-rf', test_dir],
+            )
+
+            ctx.cluster.only(client).run(
+                args=['rm', '-rf', '{tdir}/{doc}'.format(tdir=teuthology.get_testdir(ctx),doc=kafka_file)],
+            )
+
+
+@contextlib.contextmanager
+def run_kafka(ctx,config):
+    """
+    This includes two parts:
+    1. Starting Zookeeper service
+    2. Starting Kafka service
+    """
+    assert isinstance(config, dict)
+    log.info('Bringing up Zookeeper and Kafka services...')
+    for (client,_) in config.items():
+        (remote,) = ctx.cluster.only(client).remotes.keys()
+        kafka_dir = get_kafka_dir(ctx, config)
+
+        second_broker_data = "{tdir}/data/broker02".format(tdir=kafka_dir)
+        second_broker_java_log_dir = "{}/java_logs".format(second_broker_data)
+
+        ctx.cluster.only(client).run(
+            args=['cd', '{tdir}/bin'.format(tdir=kafka_dir), run.Raw('&&'),
+             './zookeeper-server-start.sh',
+             '{tir}/config/zookeeper.properties'.format(tir=kafka_dir),
+             run.Raw('&'), 'exit'
+            ],
+        )
+
+        ctx.cluster.only(client).run(
+            args=['cd', '{tdir}/bin'.format(tdir=kafka_dir), run.Raw('&&'),
+             './kafka-server-start.sh',
+             '{tir}/config/server.properties'.format(tir=get_kafka_dir(ctx, config)),
+             run.Raw('&'), 'exit'
+            ],
+        )
+        
+        ctx.cluster.only(client).run(
+            args=['cd', '{tdir}/bin'.format(tdir=kafka_dir), run.Raw('&&'),
+             run.Raw('LOG_DIR={second_broker_java_log_dir}'.format(second_broker_java_log_dir=second_broker_java_log_dir)), 
+             './kafka-server-start.sh', '{tdir}/config/server2.properties'.format(tdir=kafka_dir),
+             run.Raw('&'), 'exit'
+            ],
+        )
+
+    try:
+        yield
+    finally:
+        log.info('Stopping Zookeeper and Kafka Services...')
+
+        for (client, _) in config.items():
+            (remote,) = ctx.cluster.only(client).remotes.keys()
+
+            ctx.cluster.only(client).run(
+                args=['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
+                 './kafka-server-stop.sh',  
+                 '{tir}/config/kafka.properties'.format(tir=get_kafka_dir(ctx, config)),
+                ],
+            )
+
+            time.sleep(5)
+
+            ctx.cluster.only(client).run(
+                args=['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'), 
+                 './zookeeper-server-stop.sh',
+                 '{tir}/config/zookeeper.properties'.format(tir=get_kafka_dir(ctx, config)),
+                ],
+            )
+
+            time.sleep(5)
+
+            ctx.cluster.only(client).run(args=['killall', '-9', 'java'])
+
+
+@contextlib.contextmanager
+def run_admin_cmds(ctx,config):
+    """
+    Running Kafka Admin commands in order to check the working of producer anf consumer and creation of topic.
+    """
+    assert isinstance(config, dict)
+    log.info('Checking kafka server through producer/consumer commands...')
+    for (client,_) in config.items():
+        (remote,) = ctx.cluster.only(client).remotes.keys()
+
+        ctx.cluster.only(client).run(
+            args=[
+                'cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'), 
+                './kafka-topics.sh', '--create', '--topic', 'quickstart-events',
+                '--bootstrap-server', 'localhost:9092'
+            ],
+        )
+
+        ctx.cluster.only(client).run(
+            args=[
+                'cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
+                'echo', "First", run.Raw('|'),
+                './kafka-console-producer.sh', '--topic', 'quickstart-events',
+                '--bootstrap-server', 'localhost:9092'
+            ],
+        )
+
+        ctx.cluster.only(client).run(
+            args=[
+                'cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
+                './kafka-console-consumer.sh', '--topic', 'quickstart-events',
+                '--from-beginning',
+                '--bootstrap-server', 'localhost:9092',
+                run.Raw('&'), 'exit'
+            ],
+        )
+
+    try:
+        yield
+    finally:
+        pass
+
+
+@contextlib.contextmanager
+def task(ctx,config):
+    """
+    Following is the way how to run kafka::
+    tasks:
+    - kafka:
+        client.0:
+          kafka_version: 2.6.0
+    """
+    assert config is None or isinstance(config, list) \
+        or isinstance(config, dict), \
+        "task kafka only supports a list or dictionary for configuration"
+
+    all_clients = ['client.{id}'.format(id=id_)
+                   for id_ in teuthology.all_roles_of_type(ctx.cluster, 'client')]
+    if config is None:
+        config = all_clients
+    if isinstance(config, list):
+        config = dict.fromkeys(config)
+
+    log.debug('Kafka config is %s', config)
+
+    with contextutil.nested(
+        lambda: install_kafka(ctx=ctx, config=config),
+        lambda: run_kafka(ctx=ctx, config=config),
+        lambda: run_admin_cmds(ctx=ctx, config=config),
+        ):
+        yield
+
index b4697a6f797f07f040bdd90b72c137ecd2647502..f1eae3c89c4e2fdf20540d82278992fadb0f8f8d 100644 (file)
@@ -220,7 +220,7 @@ def run_tests(ctx, config):
     for client, client_config in config.items():
         (remote,) = ctx.cluster.only(client).remotes.keys()
 
-        attr = ["!kafka_test", "!data_path_v2_kafka_test", "!amqp_test", "!amqp_ssl_test", "!kafka_security_test", "!modification_required", "!manual_test", "!http_test"]
+        attr = ["!kafka_test", "!data_path_v2_kafka_test", "!kafka_failover", "!amqp_test", "!amqp_ssl_test", "!kafka_security_test", "!modification_required", "!manual_test", "!http_test"]
 
         if 'extra_attr' in client_config:
             attr = client_config.get('extra_attr')
index 07d65fa10280f6e92745f4f8a6475456c0de733e..d22c61e9b08578e8f732571b23e670528eaebf24 100644 (file)
@@ -281,7 +281,7 @@ public:
            conn_id, _endpoint, get_bool(args, "use-ssl", false),
            get_bool(args, "verify-ssl", true), args.get_optional("ca-location"),
            args.get_optional("mechanism"), args.get_optional("user-name"),
-           args.get_optional("password"))) {
+           args.get_optional("password"), args.get_optional("kafka-brokers"))) {
      throw configuration_error("Kafka: failed to create connection to: " +
                                _endpoint);
    }
@@ -434,4 +434,3 @@ void RGWPubSubEndpoint::shutdown_all() {
 #endif
   shutdown_http_manager();
 }
-
index 0807993338d586ea9c2577063146f8c5871a1a07..b38b1a78ec476521ddbd967317c65871b00ff82c 100644 (file)
@@ -13,6 +13,7 @@
 #include <thread>
 #include <atomic>
 #include <mutex>
+#include <boost/algorithm/string.hpp>
 #include <boost/functional/hash.hpp>
 #include <boost/lockfree/queue.hpp>
 #include "common/dout.h"
@@ -595,7 +596,8 @@ public:
                boost::optional<const std::string&> ca_location,
                boost::optional<const std::string&> mechanism,
                boost::optional<const std::string&> topic_user_name,
-               boost::optional<const std::string&> topic_password) {
+               boost::optional<const std::string&> topic_password,
+               boost::optional<const std::string&> brokers) {
     if (stopped) {
       ldout(cct, 1) << "Kafka connect: manager is stopped" << dendl;
       return false;
@@ -603,8 +605,8 @@ public:
 
     std::string user;
     std::string password;
-    std::string broker;
-    if (!parse_url_authority(url, broker, user, password)) {
+    std::string broker_list;
+    if (!parse_url_authority(url, broker_list, user, password)) {
       // TODO: increment counter
       ldout(cct, 1) << "Kafka connect: URL parsing failed" << dendl;
       return false;
@@ -632,7 +634,13 @@ public:
       ldout(cct, 1) << "Kafka connect: user/password are only allowed over secure connection" << dendl;
       return false;
     }
-    connection_id_t tmp_id(broker, user, password, ca_location, mechanism,
+
+    if (brokers.has_value()) {
+      broker_list.append(",");
+      broker_list.append(brokers.get());
+    }
+
+    connection_id_t tmp_id(broker_list, user, password, ca_location, mechanism,
                            use_ssl);
     std::lock_guard lock(connections_lock);
     const auto it = connections.find(tmp_id);
@@ -652,7 +660,7 @@ public:
       return false;
     }
 
-    auto conn = std::make_unique<connection_t>(cct, broker, use_ssl, verify_ssl, ca_location, user, password, mechanism);
+    auto conn = std::make_unique<connection_t>(cct, broker_list, use_ssl, verify_ssl, ca_location, user, password, mechanism);
     if (!new_producer(conn.get())) {
       ldout(cct, 10) << "Kafka connect: producer creation failed in new connection" << dendl;
       return false;
@@ -770,11 +778,12 @@ bool connect(connection_id_t& conn_id,
              boost::optional<const std::string&> ca_location,
              boost::optional<const std::string&> mechanism,
              boost::optional<const std::string&> user_name,
-             boost::optional<const std::string&> password) {
+             boost::optional<const std::string&> password,
+             boost::optional<const std::string&> brokers) {
   std::shared_lock lock(s_manager_mutex);
   if (!s_manager) return false;
   return s_manager->connect(conn_id, url, use_ssl, verify_ssl, ca_location,
-                            mechanism, user_name, password);
+                            mechanism, user_name, password, brokers);
 }
 
 int publish(const connection_id_t& conn_id,
index b7aa0d15759fc0822af6f60b660eb3c17b39d281..858b185219fabffe3b5afcc47d2fc9fdafc664d0 100644 (file)
@@ -48,7 +48,8 @@ bool connect(connection_id_t& conn_id,
              boost::optional<const std::string&> ca_location,
              boost::optional<const std::string&> mechanism,
              boost::optional<const std::string&> user_name,
-             boost::optional<const std::string&> password);
+             boost::optional<const std::string&> password,
+             boost::optional<const std::string&> brokers);
 
 // publish a message over a connection that was already created
 int publish(const connection_id_t& conn_id,
index a3cff2bedabb333094172eba9ce56eed380cc5ce..bb74eceedc3933ba26c962ae73ee128422715f97 100644 (file)
@@ -1,4 +1,4 @@
-nose >=1.0.0
+nose-py3 >=1.0.0
 boto >=2.6.0
 boto3 >=1.0.0
 configparser >=5.0.0
index 90ee33617fe8670131998d550929ae4313ec9e90..83d66b77b4c63f0b5beecc15b74caf98615ae074 100644 (file)
@@ -410,17 +410,25 @@ kafka_server = 'localhost'
 
 class KafkaReceiver(object):
     """class for receiving and storing messages on a topic from the kafka broker"""
-    def __init__(self, topic, security_type):
+    def __init__(self, topic, security_type, kafka_server='localhost'):
         from kafka import KafkaConsumer
         remaining_retries = 10
         port = 9092
         if security_type != 'PLAINTEXT':
             security_type = 'SSL'
             port = 9093
+
+        if kafka_server is None:
+            endpoint = "localhost" + ":" + str(port)
+        elif ":" not in kafka_server:
+            endpoint = kafka_server + ":" + str(port)
+        else:
+            endpoint = kafka_server
+
         while remaining_retries > 0:
             try:
                 self.consumer = KafkaConsumer(topic,
-                        bootstrap_servers = kafka_server+':'+str(port),
+                        bootstrap_servers=endpoint,
                         security_protocol=security_type,
                         consumer_timeout_ms=16000,
                         auto_offset_reset='earliest')
@@ -468,9 +476,9 @@ def kafka_receiver_thread_runner(receiver):
         print('Kafka receiver ended unexpectedly: ' + str(error))
 
 
-def create_kafka_receiver_thread(topic, security_type='PLAINTEXT'):
+def create_kafka_receiver_thread(topic, security_type='PLAINTEXT', kafka_brokers=None):
     """create kafka receiver and thread"""
-    receiver = KafkaReceiver(topic, security_type)
+    receiver = KafkaReceiver(topic, security_type, kafka_server=kafka_brokers)
     task = threading.Thread(target=kafka_receiver_thread_runner, args=(receiver,))
     task.daemon = True
     return task, receiver
@@ -1304,7 +1312,7 @@ def test_ps_s3_notification_errors_on_master():
     conn.delete_bucket(bucket_name)
 
 
-def notification_push(endpoint_type, conn, account=None, cloudevents=False):
+def notification_push(endpoint_type, conn, account=None, cloudevents=False, kafka_brokers=None):
     """ test pushinging notification """
     zonegroup = get_config_zonegroup()
     # create bucket
@@ -1359,11 +1367,13 @@ def notification_push(endpoint_type, conn, account=None, cloudevents=False):
         assert_equal(status/100, 2)
     elif endpoint_type == 'kafka':
         # start amqp receiver
-        task, receiver = create_kafka_receiver_thread(topic_name)
+        task, receiver = create_kafka_receiver_thread(topic_name, kafka_brokers=kafka_brokers)
         task.start()
         endpoint_address = 'kafka://' + kafka_server
         # without acks from broker
         endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker'
+        if kafka_brokers is not None:
+            endpoint_args += '&kafka-brokers=' + kafka_brokers
         # create s3 topic
         topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
         topic_arn = topic_conf.set_config()
@@ -1581,6 +1591,20 @@ def test_notification_push_kafka():
     notification_push('kafka', conn)
 
 
+@attr('kafka_failover')
+def test_notification_push_kafka_multiple_brokers_override():
+    """ test pushing kafka s3 notification on master """
+    conn = connection()
+    notification_push('kafka', conn, kafka_brokers='localhost:9092,localhost:19092')
+
+
+@attr('kafka_failover')
+def test_notification_push_kafka_multiple_brokers_append():
+    """ test pushing kafka s3 notification on master """
+    conn = connection()
+    notification_push('kafka', conn, kafka_brokers='localhost:19092')
+
+
 @attr('http_test')
 def test_ps_s3_notification_multi_delete_on_master():
     """ test deletion of multiple keys on master """