From 8713c3169c0f9df1d2fc23ff2b82ede1e25be282 Mon Sep 17 00:00:00 2001 From: Kalpesh Pandya Date: Thu, 19 Aug 2021 13:18:03 +0530 Subject: [PATCH] src/rgw: Connection timeout implementation Checking the idleness of a connection and deleting the connection if idle for 30 secs. Added a manual test for checking the behavior. This PR fixes: https://tracker.ceph.com/issues/50074 Signed-off-by: Kalpesh Pandya --- src/rgw/rgw_kafka.cc | 11 ++ src/test/rgw/bucket_notification/test_bn.py | 126 ++++++++++++++++++++ 2 files changed, 137 insertions(+) diff --git a/src/rgw/rgw_kafka.cc b/src/rgw/rgw_kafka.cc index 3847453551f5b..9b75c9d088574 100644 --- a/src/rgw/rgw_kafka.cc +++ b/src/rgw/rgw_kafka.cc @@ -73,6 +73,7 @@ struct connection_t { const boost::optional ca_location; const std::string user; const std::string password; + utime_t timestamp = ceph_clock_now(); // cleanup of all internal connection resource // the object can still remain, and internal connection @@ -294,6 +295,7 @@ public: const size_t max_connections; const size_t max_inflight; const size_t max_queue; + const size_t max_idle_time; private: std::atomic connection_count; bool stopped; @@ -311,6 +313,8 @@ private: const std::unique_ptr msg_owner(message); auto& conn = message->conn; + conn->timestamp = ceph_clock_now(); + if (!conn->is_ok()) { // connection had an issue while message was in the queue // TODO add error stats @@ -416,6 +420,12 @@ private: auto& conn = conn_it->second; + // Checking the connection idlesness + if(conn->timestamp.sec() + max_idle_time < ceph_clock_now()) { + ldout(conn->cct, 20) << "Time for deleting a connection due to idle behaviour: " << ceph_clock_now() << dendl; + ERASE_AND_CONTINUE(conn_it, connections); + } + // try to reconnect the connection if it has an error if (!conn->is_ok()) { ldout(conn->cct, 10) << "Kafka run: connection status is: " << status_to_string(conn->status) << dendl; @@ -458,6 +468,7 @@ public: max_connections(_max_connections), max_inflight(_max_inflight), max_queue(_max_queue), + max_idle_time(30), connection_count(0), stopped(false), read_timeout_ms(_read_timeout_ms), diff --git a/src/test/rgw/bucket_notification/test_bn.py b/src/test/rgw/bucket_notification/test_bn.py index ac72b1c2bc16b..228ff9f447589 100644 --- a/src/test/rgw/bucket_notification/test_bn.py +++ b/src/test/rgw/bucket_notification/test_bn.py @@ -2224,6 +2224,132 @@ def test_ps_s3_persistent_notification_pushback(): http_server.close() +@attr('manual_test') +def test_ps_s3_notification_kafka_idle_behaviour(): + """ test pushing kafka s3 notification idle behaviour check """ + # TODO convert this test to actual running test by changing + # os.system call to verify the process idleness + return SkipTest("only used in manual testing") + conn = connection() + zonegroup = 'default' + + # create bucket + bucket_name = gen_bucket_name() + bucket = conn.create_bucket(bucket_name) + # name is constant for manual testing + topic_name = bucket_name+'_topic' + # create consumer on the topic + + task, receiver = create_kafka_receiver_thread(topic_name+'_1') + task.start() + + # create s3 topic + endpoint_address = 'kafka://' + kafka_server + # with acks from broker + endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker' + topic_conf1 = PSTopicS3(conn, topic_name+'_1', zonegroup, endpoint_args=endpoint_args) + topic_arn1 = topic_conf1.set_config() + # create s3 notification + notification_name = bucket_name + NOTIFICATION_SUFFIX + topic_conf_list = [{'Id': notification_name + '_1', 'TopicArn': topic_arn1, + 'Events': [] + }] + + s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) + response, status = s3_notification_conf.set_config() + assert_equal(status/100, 2) + + # create objects in the bucket (async) + number_of_objects = 10 + client_threads = [] + etags = [] + start_time = time.time() + for i in range(number_of_objects): + key = bucket.new_key(str(i)) + content = str(os.urandom(1024*1024)) + etag = hashlib.md5(content.encode()).hexdigest() + etags.append(etag) + thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) + thr.start() + client_threads.append(thr) + [thr.join() for thr in client_threads] + + time_diff = time.time() - start_time + print('average time for creation + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') + + print('wait for 5sec for the messages...') + time.sleep(5) + keys = list(bucket.list()) + receiver.verify_s3_events(keys, exact_match=True, etags=etags) + + # delete objects from the bucket + client_threads = [] + start_time = time.time() + for key in bucket.list(): + thr = threading.Thread(target = key.delete, args=()) + thr.start() + client_threads.append(thr) + [thr.join() for thr in client_threads] + + time_diff = time.time() - start_time + print('average time for deletion + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') + + print('wait for 5sec for the messages...') + time.sleep(5) + receiver.verify_s3_events(keys, exact_match=True, deletions=True, etags=etags) + + print('waiting for 40sec for checking idleness') + time.sleep(40) + + os.system("netstat -nnp | grep 9092"); + + # do the process of uploading an object and checking for notification again + number_of_objects = 10 + client_threads = [] + etags = [] + start_time = time.time() + for i in range(number_of_objects): + key = bucket.new_key(str(i)) + content = str(os.urandom(1024*1024)) + etag = hashlib.md5(content.encode()).hexdigest() + etags.append(etag) + thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) + thr.start() + client_threads.append(thr) + [thr.join() for thr in client_threads] + + time_diff = time.time() - start_time + print('average time for creation + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') + + print('wait for 5sec for the messages...') + time.sleep(5) + keys = list(bucket.list()) + receiver.verify_s3_events(keys, exact_match=True, etags=etags) + + # delete objects from the bucket + client_threads = [] + start_time = time.time() + for key in bucket.list(): + thr = threading.Thread(target = key.delete, args=()) + thr.start() + client_threads.append(thr) + [thr.join() for thr in client_threads] + + time_diff = time.time() - start_time + print('average time for deletion + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') + + print('wait for 5sec for the messages...') + time.sleep(5) + receiver.verify_s3_events(keys, exact_match=True, deletions=True, etags=etags) + + # cleanup + s3_notification_conf.del_config() + topic_conf1.del_config() + # delete the bucket + conn.delete_bucket(bucket_name) + stop_kafka_receiver(receiver, task) + + @attr('modification_required') def test_ps_s3_persistent_gateways_recovery(): """ test gateway recovery of persistent notifications """ -- 2.39.5