]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
src/rgw: Connection timeout implementation 42840/head
authorKalpesh Pandya <kapandya@redhat.com>
Thu, 19 Aug 2021 07:48:03 +0000 (13:18 +0530)
committerKalpesh Pandya <kapandya@redhat.com>
Wed, 1 Sep 2021 10:52:03 +0000 (16:22 +0530)
Checking the idleness of a connection and deleting the connection if idle for 30 secs.
Added a manual test for checking the behavior.
This PR fixes: https://tracker.ceph.com/issues/50074

Signed-off-by: Kalpesh Pandya <kapandya@redhat.com>
src/rgw/rgw_kafka.cc
src/test/rgw/bucket_notification/test_bn.py

index 3847453551f5b5b319472663743c0ec62d74a94d..9b75c9d0885749b55d3278cb9e6a27bd7f7b22dd 100644 (file)
@@ -73,6 +73,7 @@ struct connection_t {
   const boost::optional<std::string> ca_location;
   const std::string user;
   const std::string password;
+  utime_t timestamp = ceph_clock_now();
 
   // cleanup of all internal connection resource
   // the object can still remain, and internal connection
@@ -294,6 +295,7 @@ public:
   const size_t max_connections;
   const size_t max_inflight;
   const size_t max_queue;
+  const size_t max_idle_time;
 private:
   std::atomic<size_t> connection_count;
   bool stopped;
@@ -311,6 +313,8 @@ private:
     const std::unique_ptr<message_wrapper_t> msg_owner(message);
     auto& conn = message->conn;
 
+    conn->timestamp = ceph_clock_now(); 
+
     if (!conn->is_ok()) {
       // connection had an issue while message was in the queue
       // TODO add error stats
@@ -416,6 +420,12 @@ private:
         
         auto& conn = conn_it->second;
 
+        // Checking the connection idlesness
+        if(conn->timestamp.sec() + max_idle_time < ceph_clock_now()) {
+          ldout(conn->cct, 20) << "Time for deleting a connection due to idle behaviour: " << ceph_clock_now() << dendl;
+          ERASE_AND_CONTINUE(conn_it, connections);
+        }
+
         // try to reconnect the connection if it has an error
         if (!conn->is_ok()) {
           ldout(conn->cct, 10) << "Kafka run: connection status is: " << status_to_string(conn->status) << dendl;
@@ -458,6 +468,7 @@ public:
     max_connections(_max_connections),
     max_inflight(_max_inflight),
     max_queue(_max_queue),
+    max_idle_time(30),
     connection_count(0),
     stopped(false),
     read_timeout_ms(_read_timeout_ms),
index ac72b1c2bc16b370cc4b45af7bdd992759b8edab..228ff9f447589aeb8d21e8c37e96933de10fed8d 100644 (file)
@@ -2224,6 +2224,132 @@ def test_ps_s3_persistent_notification_pushback():
     http_server.close()
 
 
+@attr('manual_test')
+def test_ps_s3_notification_kafka_idle_behaviour():
+    """ test pushing kafka s3 notification idle behaviour check """
+    # TODO convert this test to actual running test by changing
+    # os.system call to verify the process idleness
+    return SkipTest("only used in manual testing")
+    conn = connection()
+    zonegroup = 'default'
+
+    # create bucket
+    bucket_name = gen_bucket_name()
+    bucket = conn.create_bucket(bucket_name)
+    # name is constant for manual testing
+    topic_name = bucket_name+'_topic'
+    # create consumer on the topic
+   
+    task, receiver = create_kafka_receiver_thread(topic_name+'_1')
+    task.start()
+
+    # create s3 topic
+    endpoint_address = 'kafka://' + kafka_server
+    # with acks from broker
+    endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker'
+    topic_conf1 = PSTopicS3(conn, topic_name+'_1', zonegroup, endpoint_args=endpoint_args)
+    topic_arn1 = topic_conf1.set_config()
+    # create s3 notification
+    notification_name = bucket_name + NOTIFICATION_SUFFIX
+    topic_conf_list = [{'Id': notification_name + '_1', 'TopicArn': topic_arn1,
+                     'Events': []
+                   }]
+
+    s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
+    response, status = s3_notification_conf.set_config()
+    assert_equal(status/100, 2)
+
+    # create objects in the bucket (async)
+    number_of_objects = 10
+    client_threads = []
+    etags = []
+    start_time = time.time()
+    for i in range(number_of_objects):
+        key = bucket.new_key(str(i))
+        content = str(os.urandom(1024*1024))
+        etag = hashlib.md5(content.encode()).hexdigest()
+        etags.append(etag)
+        thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+        thr.start()
+        client_threads.append(thr)
+    [thr.join() for thr in client_threads]
+
+    time_diff = time.time() - start_time
+    print('average time for creation + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+
+    print('wait for 5sec for the messages...')
+    time.sleep(5)
+    keys = list(bucket.list())
+    receiver.verify_s3_events(keys, exact_match=True, etags=etags)
+
+    # delete objects from the bucket
+    client_threads = []
+    start_time = time.time()
+    for key in bucket.list():
+        thr = threading.Thread(target = key.delete, args=())
+        thr.start()
+        client_threads.append(thr)
+    [thr.join() for thr in client_threads]
+
+    time_diff = time.time() - start_time
+    print('average time for deletion + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+
+    print('wait for 5sec for the messages...')
+    time.sleep(5)
+    receiver.verify_s3_events(keys, exact_match=True, deletions=True, etags=etags)
+
+    print('waiting for 40sec for checking idleness')
+    time.sleep(40)
+
+    os.system("netstat -nnp | grep 9092");
+
+    # do the process of uploading an object and checking for notification again
+    number_of_objects = 10
+    client_threads = []
+    etags = []
+    start_time = time.time()
+    for i in range(number_of_objects):
+        key = bucket.new_key(str(i))
+        content = str(os.urandom(1024*1024))
+        etag = hashlib.md5(content.encode()).hexdigest()
+        etags.append(etag)
+        thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+        thr.start()
+        client_threads.append(thr)
+    [thr.join() for thr in client_threads]
+
+    time_diff = time.time() - start_time
+    print('average time for creation + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+
+    print('wait for 5sec for the messages...')
+    time.sleep(5)
+    keys = list(bucket.list())
+    receiver.verify_s3_events(keys, exact_match=True, etags=etags)
+
+    # delete objects from the bucket
+    client_threads = []
+    start_time = time.time()
+    for key in bucket.list():
+        thr = threading.Thread(target = key.delete, args=())
+        thr.start()
+        client_threads.append(thr)
+    [thr.join() for thr in client_threads]
+
+    time_diff = time.time() - start_time
+    print('average time for deletion + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+
+    print('wait for 5sec for the messages...')
+    time.sleep(5)
+    receiver.verify_s3_events(keys, exact_match=True, deletions=True, etags=etags)
+
+    # cleanup
+    s3_notification_conf.del_config()
+    topic_conf1.del_config()
+    # delete the bucket
+    conn.delete_bucket(bucket_name)
+    stop_kafka_receiver(receiver, task)
+
+
 @attr('modification_required')
 def test_ps_s3_persistent_gateways_recovery():
     """ test gateway recovery of persistent notifications """