const boost::optional<std::string> ca_location;
const std::string user;
const std::string password;
+ utime_t timestamp = ceph_clock_now();
// cleanup of all internal connection resource
// the object can still remain, and internal connection
const size_t max_connections;
const size_t max_inflight;
const size_t max_queue;
+ const size_t max_idle_time;
private:
std::atomic<size_t> connection_count;
bool stopped;
const std::unique_ptr<message_wrapper_t> msg_owner(message);
auto& conn = message->conn;
+ conn->timestamp = ceph_clock_now();
+
if (!conn->is_ok()) {
// connection had an issue while message was in the queue
// TODO add error stats
auto& conn = conn_it->second;
+ // Checking the connection idlesness
+ if(conn->timestamp.sec() + max_idle_time < ceph_clock_now()) {
+ ldout(conn->cct, 20) << "Time for deleting a connection due to idle behaviour: " << ceph_clock_now() << dendl;
+ ERASE_AND_CONTINUE(conn_it, connections);
+ }
+
// try to reconnect the connection if it has an error
if (!conn->is_ok()) {
ldout(conn->cct, 10) << "Kafka run: connection status is: " << status_to_string(conn->status) << dendl;
max_connections(_max_connections),
max_inflight(_max_inflight),
max_queue(_max_queue),
+ max_idle_time(30),
connection_count(0),
stopped(false),
read_timeout_ms(_read_timeout_ms),
http_server.close()
+@attr('manual_test')
+def test_ps_s3_notification_kafka_idle_behaviour():
+ """ test pushing kafka s3 notification idle behaviour check """
+ # TODO convert this test to actual running test by changing
+ # os.system call to verify the process idleness
+ return SkipTest("only used in manual testing")
+ conn = connection()
+ zonegroup = 'default'
+
+ # create bucket
+ bucket_name = gen_bucket_name()
+ bucket = conn.create_bucket(bucket_name)
+ # name is constant for manual testing
+ topic_name = bucket_name+'_topic'
+ # create consumer on the topic
+
+ task, receiver = create_kafka_receiver_thread(topic_name+'_1')
+ task.start()
+
+ # create s3 topic
+ endpoint_address = 'kafka://' + kafka_server
+ # with acks from broker
+ endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker'
+ topic_conf1 = PSTopicS3(conn, topic_name+'_1', zonegroup, endpoint_args=endpoint_args)
+ topic_arn1 = topic_conf1.set_config()
+ # create s3 notification
+ notification_name = bucket_name + NOTIFICATION_SUFFIX
+ topic_conf_list = [{'Id': notification_name + '_1', 'TopicArn': topic_arn1,
+ 'Events': []
+ }]
+
+ s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
+ response, status = s3_notification_conf.set_config()
+ assert_equal(status/100, 2)
+
+ # create objects in the bucket (async)
+ number_of_objects = 10
+ client_threads = []
+ etags = []
+ start_time = time.time()
+ for i in range(number_of_objects):
+ key = bucket.new_key(str(i))
+ content = str(os.urandom(1024*1024))
+ etag = hashlib.md5(content.encode()).hexdigest()
+ etags.append(etag)
+ thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+
+ time_diff = time.time() - start_time
+ print('average time for creation + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+
+ print('wait for 5sec for the messages...')
+ time.sleep(5)
+ keys = list(bucket.list())
+ receiver.verify_s3_events(keys, exact_match=True, etags=etags)
+
+ # delete objects from the bucket
+ client_threads = []
+ start_time = time.time()
+ for key in bucket.list():
+ thr = threading.Thread(target = key.delete, args=())
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+
+ time_diff = time.time() - start_time
+ print('average time for deletion + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+
+ print('wait for 5sec for the messages...')
+ time.sleep(5)
+ receiver.verify_s3_events(keys, exact_match=True, deletions=True, etags=etags)
+
+ print('waiting for 40sec for checking idleness')
+ time.sleep(40)
+
+ os.system("netstat -nnp | grep 9092");
+
+ # do the process of uploading an object and checking for notification again
+ number_of_objects = 10
+ client_threads = []
+ etags = []
+ start_time = time.time()
+ for i in range(number_of_objects):
+ key = bucket.new_key(str(i))
+ content = str(os.urandom(1024*1024))
+ etag = hashlib.md5(content.encode()).hexdigest()
+ etags.append(etag)
+ thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+
+ time_diff = time.time() - start_time
+ print('average time for creation + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+
+ print('wait for 5sec for the messages...')
+ time.sleep(5)
+ keys = list(bucket.list())
+ receiver.verify_s3_events(keys, exact_match=True, etags=etags)
+
+ # delete objects from the bucket
+ client_threads = []
+ start_time = time.time()
+ for key in bucket.list():
+ thr = threading.Thread(target = key.delete, args=())
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+
+ time_diff = time.time() - start_time
+ print('average time for deletion + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+
+ print('wait for 5sec for the messages...')
+ time.sleep(5)
+ receiver.verify_s3_events(keys, exact_match=True, deletions=True, etags=etags)
+
+ # cleanup
+ s3_notification_conf.del_config()
+ topic_conf1.del_config()
+ # delete the bucket
+ conn.delete_bucket(bucket_name)
+ stop_kafka_receiver(receiver, task)
+
+
@attr('modification_required')
def test_ps_s3_persistent_gateways_recovery():
""" test gateway recovery of persistent notifications """