]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/kafka: setting maximum batch size when sending messages
authorYuval Lifshitz <ylifshit@ibm.com>
Mon, 20 Apr 2026 14:06:25 +0000 (14:06 +0000)
committerYuval Lifshitz <ylifshit@ibm.com>
Sun, 26 Apr 2026 06:19:55 +0000 (06:19 +0000)
Fixes: https://tracker.ceph.com/issues/75928
Signed-off-by: Yuval Lifshitz <ylifshit@ibm.com>
doc/radosgw/notifications.rst
qa/tasks/kafka_failover.py
src/common/options/rgw.yaml.in
src/rgw/rgw_kafka.cc
src/test/rgw/bucket_notification/test_bn.py

index 2fa9091348333d533ecc93f0ed73ca8b41899dcb..571bb0068c6eca5b19d476e8be6236fde5daaf40 100644 (file)
@@ -170,6 +170,15 @@ HTTP
 .. confval:: rgw_http_notif_connection_timeout
 .. confval:: rgw_http_notif_max_inflight
 
+Kafka
+~~~~~
+After recovering from a broker failure, a persistent topic will try
+to resend all notifications in batches. If the topic is configured on
+the broker with a segment size smaller than our default (1MB), sending
+the messages would fail. If we know that we have such segment size
+configuration, we should send smaller batches using:
+
+.. confval:: rgw_kafka_max_batch_size
 
 Bucket Notification REST API
 ----------------------------
index ab89907c79a810efee8f198fbeeb2914508bd42f..8f1794865d62588dc943871c9128afc0bc89bfb5 100644 (file)
@@ -306,6 +306,8 @@ def task(ctx,config):
     if isinstance(config, list):
         config = dict.fromkeys(config)
 
+    ctx.kafka_dir = get_kafka_dir(ctx, config)
+
     log.debug('Kafka config is %s', config)
 
     with contextutil.nested(
index 57000c5154b53618b4ddd61a8a75e90d3fa7e87c..de19438a8e19755d2fad41c99b8c8b8fd517f204 100644 (file)
@@ -4490,6 +4490,17 @@ options:
   services:
   - rgw
   with_legacy: true
+- name: rgw_kafka_max_batch_size
+  type: uint
+  level: advanced
+  desc: This is the maximum size in bytes of a batch of messages sent to kafka
+  long_desc: This is the maximum size in bytes of a batch of messages sent to kafka. Messages will be sent
+    in batches to improve performance, and this option sets the maximum size of those batches.
+    If set to zero, the value is not set and the default batch size will be used.
+  default: 0
+  services:
+  - rgw
+  with_legacy: true
 - name: rgw_http_notif_message_timeout
   type: uint
   level: advanced
index 658f2ed2c809941ccc521415795b3ae407050ccf..e4d6074f27afd8ef28724b21717d54aca04b23ca 100644 (file)
@@ -286,9 +286,22 @@ bool new_producer(connection_t* conn) {
   // however, testing with librdkafka v1.6.1 did not expire the message in that case. hence, a value of zero is changed to 1ms
   constexpr std::uint64_t min_message_timeout = 1;
   const auto message_timeout = std::max(min_message_timeout, conn->cct->_conf->rgw_kafka_message_timeout);
+  const auto batch_size = conn->cct->_conf->rgw_kafka_max_batch_size;
+  ldout(conn->cct, 1) << "Kafka connect: broker=" << conn->broker
+      << " use_ssl=" << conn->use_ssl
+      << " user=" << conn->user
+      << " message_timeout=" << message_timeout
+      << " batch_size=" << batch_size << dendl;
+
   if (rd_kafka_conf_set(conf.get(), "message.timeout.ms", 
         std::to_string(message_timeout).c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
 
+  if (batch_size > 0) {
+    if (rd_kafka_conf_set(conf.get(), "batch.size",
+        std::to_string(batch_size).c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
+    if (rd_kafka_conf_set(conf.get(), "message.max.bytes",
+        std::to_string(batch_size).c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
+  }
   // get list of brokers based on the bootstrap broker
   if (rd_kafka_conf_set(conf.get(), "bootstrap.servers", conn->broker.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
 
index 1155ed2813512ec76e2a391aaf5597cb142d6397..974bf68f603796be12e095c670c03d2ddfdc9cd4 100644 (file)
@@ -39,6 +39,7 @@ from .api import PSTopicS3, \
     delete_all_topics, \
     put_object_tagging, \
     admin, \
+    ceph_admin, \
     set_rgw_config_option, \
     bash, \
     S3Connection, \
@@ -3583,7 +3584,7 @@ def test_persistent_topic_configs_max_retries():
     persistent_topic_configs(persistency_time, config_dict)
 
 @pytest.mark.manual_test
-def test_persistent_notificationback():
+def test_persistent_notification_pushback():
     """ test pushing persistent notification pushback """
     conn = connection()
     zonegroup = get_config_zonegroup()
@@ -3663,6 +3664,27 @@ def test_persistent_notificationback():
     http_server.close()
 
 
+def verify_idleness(port, sleep_time, max_time):
+    set_rgw_config_option('client.rgw', 'rgw_kafka_connection_idle', max_time, get_config_cluster())
+    is_idle = False
+    start_time = time.time()
+    while not is_idle:
+        time.sleep(sleep_time)
+        cmd = "ss -tnp | grep {} | grep radosgw".format(port)
+        proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True)
+        out = proc.communicate()[0]
+        if len(out) == 0:
+            is_idle = True
+        else:
+            log.info("radosgw<->kafka connection is not idle: %s", out.decode('utf-8'))
+            time_diff = time.time() - start_time
+            if time_diff > max_time + sleep_time:
+                assert False, "radosgw<->kafka connection is still not idle after {}s".format(time_diff)
+
+    # set the original idle time
+    set_rgw_config_option('client.rgw', 'rgw_kafka_connection_idle', 300, get_config_cluster())
+
+
 @pytest.mark.kafka_test
 def test_notification_kafka_idle_behaviour():
     """ test pushing kafka notification idle behaviour check """
@@ -3735,19 +3757,7 @@ def test_notification_kafka_idle_behaviour():
     time.sleep(5)
     receiver.verify_s3_events(keys, exact_match=True, deletions=True, etags=etags)
 
-    is_idle = False
-
-    while not is_idle:
-        print('waiting for 10sec for checking idleness')
-        time.sleep(10)
-        cmd = "ss -tnp | grep 9092 | grep radosgw"
-        proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True)
-        out = proc.communicate()[0]
-        if len(out) == 0:
-            is_idle = True
-        else:
-            print("radosgw<->kafka connection is not idle")
-            print(out.decode('utf-8'))
+    verify_idleness(9092, 10, 30)
 
     # do the process of uploading an object and checking for notification again
     number_of_objects = 10
@@ -5931,3 +5941,139 @@ def test_ps_s3_x_amz_request_id_on_master():
     s3_notification_conf.del_config()
     topic_conf.del_config()
     conn.delete_bucket(bucket_name)
+
+
+def kafka_batch_size(match_batch_size):
+    kafka_dir = os.environ.get('KAFKA_DIR')
+    if not kafka_dir:
+        pytest.skip('KAFKA_DIR environment variable is not set')
+
+    conn = connection()
+    zonegroup = get_config_zonegroup()
+
+    # create bucket
+    bucket_name = gen_bucket_name()
+    bucket = conn.create_bucket(bucket_name)
+    topic_name = bucket_name + TOPIC_SUFFIX
+
+    host = get_ip()
+    wrong_port = 1234
+    right_port = 9092
+    max_batch_size = 4096
+
+    # start kafka receiver
+    task, receiver = create_kafka_receiver_thread(topic_name)
+    task.start()
+    verify_kafka_receiver(receiver)
+
+    kafka_configs = os.path.join(kafka_dir, 'bin/kafka-configs.sh')
+    result = subprocess.run(
+        [kafka_configs,
+         '--bootstrap-server', host + ':' + str(right_port),
+         '--entity-type', 'topics',
+         '--entity-name', topic_name,
+         '--alter',
+         '--add-config', 'max.message.bytes={}'.format(max_batch_size*2)],
+        capture_output=True, text=True, timeout=15, check=False
+    )
+    assert result.returncode == 0
+
+    # create RGW topic with wrong port so messages queue up
+    # set retry to 1 second so that messge retry is not too fast
+    endpoint_address = 'kafka://' + host + ':' + str(wrong_port)
+    endpoint_args = 'push-endpoint=' + endpoint_address + '&kafka-ack-level=broker&persistent=true' + \
+                        '&retry_sleep_duration=1'
+
+    topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
+    topic_arn = topic_conf.set_config()
+
+    # create notification
+    notification_name = bucket_name + NOTIFICATION_SUFFIX
+    topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn,
+                        'Events': []
+                        }]
+
+    s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
+    response, status = s3_notification_conf.set_config()
+    assert status/100 == 2
+
+    # upload 100 objects to the bucket
+    number_of_objects = 100
+    client_threads = []
+    for i in range(number_of_objects):
+        key = bucket.new_key('key-' + str(i))
+        content = str(os.urandom(1024))
+        thr = threading.Thread(target=set_contents_from_string, args=(key, content,))
+        thr.start()
+        client_threads.append(thr)
+    [thr.join() for thr in client_threads]
+
+    # make sure that any existing connection becomes idle and is being deleted
+    verify_idleness(right_port, 2, 30)
+
+    if match_batch_size:
+        # set rgw_kafka_max_batch_size using the daemon-specific entity name
+        rgw_client = 'client.rgw.{}'.format(get_config_port())
+        set_rgw_config_option(rgw_client, 'rgw_kafka_max_batch_size', max_batch_size, get_config_cluster())
+        # verify config is set in mon store
+        result = ceph_admin(['config', 'get', rgw_client, 'rgw_kafka_max_batch_size'], get_config_cluster())
+        assert result[1] == 0, 'failed to get config from mon store'
+        actual_value = result[0].strip().split('\n')[-1]
+        assert actual_value == str(max_batch_size), \
+            'rgw_kafka_max_batch_size not set in mon store: got {} expected {}'.format(actual_value, max_batch_size)
+        # wait for config to propagate from monitor to RGW daemon
+        time.sleep(10)
+
+    # fix the topic to point to correct broker
+    endpoint_address = 'kafka://' + host + ':' + str(right_port)
+    endpoint_args = 'push-endpoint=' + endpoint_address + '&kafka-ack-level=broker&persistent=true'
+    topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
+    topic_arn = topic_conf.set_config()
+
+    if match_batch_size:
+        # the queue should drain because batch size config matches the broker limit
+        wait_for_queue_to_drain(topic_name)
+        # verify events received
+        keys = list(bucket.list())
+        receiver.verify_s3_events(keys, exact_match=True, deletions=False)
+    else:
+        # wait and verify that the queue does NOT fully drain
+        # without the batch size config, batched messages exceed max.message.bytes
+        time.sleep(30)
+        result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster())
+        assert result[1] == 0
+        parsed_result = json.loads(result[0])
+        assert parsed_result['Topic Stats']['Entries'] > 0, \
+            'queue should not drain without batch size config'
+
+    # cleanup
+    if match_batch_size:
+        rgw_client = 'client.rgw.{}'.format(get_config_port())
+        set_rgw_config_option(rgw_client, 'rgw_kafka_max_batch_size', 0, get_config_cluster())
+    kafka_topics = os.path.join(kafka_dir, 'bin/kafka-topics.sh')
+    subprocess.run(
+        [kafka_topics,
+         '--delete', '--topic', topic_name,
+         '--bootstrap-server', host + ':' + str(right_port)],
+        capture_output=True, text=True, timeout=15, check=False
+    )
+    s3_notification_conf.del_config()
+    topic_conf.del_config()
+    # delete objects before deleting the bucket
+    delete_all_objects(conn, bucket_name)
+    conn.delete_bucket(bucket_name)
+    if match_batch_size:
+        receiver.close(task)
+
+
+@pytest.mark.kafka_test
+def test_kafka_batch_size():
+    """ test that setting rgw_kafka_max_batch_size limits the batch size sent to kafka """
+    kafka_batch_size(match_batch_size=True)
+
+
+@pytest.mark.kafka_test
+def test_kafka_batch_size_mismatch():
+    """ test that without rgw_kafka_max_batch_size, batched messages exceed the broker limit """
+    kafka_batch_size(match_batch_size=False)
+