delete_all_topics, \
put_object_tagging, \
admin, \
+ ceph_admin, \
set_rgw_config_option, \
bash, \
S3Connection, \
persistent_topic_configs(persistency_time, config_dict)
@pytest.mark.manual_test
-def test_persistent_notificationback():
+def test_persistent_notification_pushback():
""" test pushing persistent notification pushback """
conn = connection()
zonegroup = get_config_zonegroup()
http_server.close()
+def verify_idleness(port, sleep_time, max_time):
+ set_rgw_config_option('client.rgw', 'rgw_kafka_connection_idle', max_time, get_config_cluster())
+ is_idle = False
+ start_time = time.time()
+ while not is_idle:
+ time.sleep(sleep_time)
+ cmd = "ss -tnp | grep {} | grep radosgw".format(port)
+ proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True)
+ out = proc.communicate()[0]
+ if len(out) == 0:
+ is_idle = True
+ else:
+ log.info("radosgw<->kafka connection is not idle: %s", out.decode('utf-8'))
+ time_diff = time.time() - start_time
+ if time_diff > max_time + sleep_time:
+ assert False, "radosgw<->kafka connection is still not idle after {}s".format(time_diff)
+
+ # set the original idle time
+ set_rgw_config_option('client.rgw', 'rgw_kafka_connection_idle', 300, get_config_cluster())
+
+
@pytest.mark.kafka_test
def test_notification_kafka_idle_behaviour():
""" test pushing kafka notification idle behaviour check """
time.sleep(5)
receiver.verify_s3_events(keys, exact_match=True, deletions=True, etags=etags)
- is_idle = False
-
- while not is_idle:
- print('waiting for 10sec for checking idleness')
- time.sleep(10)
- cmd = "ss -tnp | grep 9092 | grep radosgw"
- proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True)
- out = proc.communicate()[0]
- if len(out) == 0:
- is_idle = True
- else:
- print("radosgw<->kafka connection is not idle")
- print(out.decode('utf-8'))
+ verify_idleness(9092, 10, 30)
# do the process of uploading an object and checking for notification again
number_of_objects = 10
s3_notification_conf.del_config()
topic_conf.del_config()
conn.delete_bucket(bucket_name)
+
+
+def kafka_batch_size(match_batch_size):
+ kafka_dir = os.environ.get('KAFKA_DIR')
+ if not kafka_dir:
+ pytest.skip('KAFKA_DIR environment variable is not set')
+
+ conn = connection()
+ zonegroup = get_config_zonegroup()
+
+ # create bucket
+ bucket_name = gen_bucket_name()
+ bucket = conn.create_bucket(bucket_name)
+ topic_name = bucket_name + TOPIC_SUFFIX
+
+ host = get_ip()
+ wrong_port = 1234
+ right_port = 9092
+ max_batch_size = 4096
+
+ # start kafka receiver
+ task, receiver = create_kafka_receiver_thread(topic_name)
+ task.start()
+ verify_kafka_receiver(receiver)
+
+ kafka_configs = os.path.join(kafka_dir, 'bin/kafka-configs.sh')
+ result = subprocess.run(
+ [kafka_configs,
+ '--bootstrap-server', host + ':' + str(right_port),
+ '--entity-type', 'topics',
+ '--entity-name', topic_name,
+ '--alter',
+ '--add-config', 'max.message.bytes={}'.format(max_batch_size*2)],
+ capture_output=True, text=True, timeout=15, check=False
+ )
+ assert result.returncode == 0
+
+ # create RGW topic with wrong port so messages queue up
+ # set retry to 1 second so that messge retry is not too fast
+ endpoint_address = 'kafka://' + host + ':' + str(wrong_port)
+ endpoint_args = 'push-endpoint=' + endpoint_address + '&kafka-ack-level=broker&persistent=true' + \
+ '&retry_sleep_duration=1'
+
+ topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
+ topic_arn = topic_conf.set_config()
+
+ # create notification
+ notification_name = bucket_name + NOTIFICATION_SUFFIX
+ topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn,
+ 'Events': []
+ }]
+
+ s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
+ response, status = s3_notification_conf.set_config()
+ assert status/100 == 2
+
+ # upload 100 objects to the bucket
+ number_of_objects = 100
+ client_threads = []
+ for i in range(number_of_objects):
+ key = bucket.new_key('key-' + str(i))
+ content = str(os.urandom(1024))
+ thr = threading.Thread(target=set_contents_from_string, args=(key, content,))
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+
+ # make sure that any existing connection becomes idle and is being deleted
+ verify_idleness(right_port, 2, 30)
+
+ if match_batch_size:
+ # set rgw_kafka_max_batch_size using the daemon-specific entity name
+ rgw_client = 'client.rgw.{}'.format(get_config_port())
+ set_rgw_config_option(rgw_client, 'rgw_kafka_max_batch_size', max_batch_size, get_config_cluster())
+ # verify config is set in mon store
+ result = ceph_admin(['config', 'get', rgw_client, 'rgw_kafka_max_batch_size'], get_config_cluster())
+ assert result[1] == 0, 'failed to get config from mon store'
+ actual_value = result[0].strip().split('\n')[-1]
+ assert actual_value == str(max_batch_size), \
+ 'rgw_kafka_max_batch_size not set in mon store: got {} expected {}'.format(actual_value, max_batch_size)
+ # wait for config to propagate from monitor to RGW daemon
+ time.sleep(10)
+
+ # fix the topic to point to correct broker
+ endpoint_address = 'kafka://' + host + ':' + str(right_port)
+ endpoint_args = 'push-endpoint=' + endpoint_address + '&kafka-ack-level=broker&persistent=true'
+ topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
+ topic_arn = topic_conf.set_config()
+
+ if match_batch_size:
+ # the queue should drain because batch size config matches the broker limit
+ wait_for_queue_to_drain(topic_name)
+ # verify events received
+ keys = list(bucket.list())
+ receiver.verify_s3_events(keys, exact_match=True, deletions=False)
+ else:
+ # wait and verify that the queue does NOT fully drain
+ # without the batch size config, batched messages exceed max.message.bytes
+ time.sleep(30)
+ result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster())
+ assert result[1] == 0
+ parsed_result = json.loads(result[0])
+ assert parsed_result['Topic Stats']['Entries'] > 0, \
+ 'queue should not drain without batch size config'
+
+ # cleanup
+ if match_batch_size:
+ rgw_client = 'client.rgw.{}'.format(get_config_port())
+ set_rgw_config_option(rgw_client, 'rgw_kafka_max_batch_size', 0, get_config_cluster())
+ kafka_topics = os.path.join(kafka_dir, 'bin/kafka-topics.sh')
+ subprocess.run(
+ [kafka_topics,
+ '--delete', '--topic', topic_name,
+ '--bootstrap-server', host + ':' + str(right_port)],
+ capture_output=True, text=True, timeout=15, check=False
+ )
+ s3_notification_conf.del_config()
+ topic_conf.del_config()
+ # delete objects before deleting the bucket
+ delete_all_objects(conn, bucket_name)
+ conn.delete_bucket(bucket_name)
+ if match_batch_size:
+ receiver.close(task)
+
+
+@pytest.mark.kafka_test
+def test_kafka_batch_size():
+ """ test that setting rgw_kafka_max_batch_size limits the batch size sent to kafka """
+ kafka_batch_size(match_batch_size=True)
+
+
+@pytest.mark.kafka_test
+def test_kafka_batch_size_mismatch():
+ """ test that without rgw_kafka_max_batch_size, batched messages exceed the broker limit """
+ kafka_batch_size(match_batch_size=False)
+