receiver.close(task)
+@pytest.mark.basic_test
+def test_ps_s3_notification_concurrent_put_eventtime():
+ """ test that eventTime is never zero when concurrent PUTs race on same key """
+ conn = connection()
+ zonegroup = get_config_zonegroup()
+
+ host = get_ip()
+ port = random.randint(10000, 20000)
+
+ bucket_name = gen_bucket_name()
+ bucket = conn.create_bucket(bucket_name)
+ topic_name = bucket_name + TOPIC_SUFFIX
+
+ # persistent topic with unreachable endpoint so events stay in queue
+ endpoint_address = 'http://' + host + ':' + str(port)
+ endpoint_args = 'push-endpoint=' + endpoint_address + '&persistent=true' + \
+ '&retry_sleep_duration=100'
+ topic_conf = PSTopicS3(conn, topic_name, zonegroup,
+ endpoint_args=endpoint_args)
+ topic_arn = topic_conf.set_config()
+
+ notification_name = bucket_name + NOTIFICATION_SUFFIX
+ topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn,
+ 'Events': ['s3:ObjectCreated:*']}]
+ s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
+ response, status = s3_notification_conf.set_config()
+ assert status // 100 == 2
+
+ # concurrent PUTs to the SAME key to trigger RADOS-level ECANCELED race
+ num_threads = 20
+ num_rounds = 5
+ key_name = 'race-target'
+
+ for round_num in range(num_rounds):
+ client_threads = []
+ for i in range(num_threads):
+ key = bucket.new_key(key_name)
+ content = str(os.urandom(128)) + str(round_num) + str(i)
+ thr = threading.Thread(target=set_contents_from_string,
+ args=(key, content,))
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+
+ time.sleep(2)
+ result = admin(['topic', 'dump', '--topic', topic_name],
+ get_config_cluster())
+ assert result[1] == 0
+ parsed_result = json.loads(result[0])
+ log.info(f'topic dump has {len(parsed_result)} events')
+ assert len(parsed_result) > 0, 'expected at least one notification event in queue'
+
+ zero_time_count = 0
+ for entry in parsed_result:
+ event = entry.get('entry', {}).get('event', {})
+ event_time = event.get('eventTime', '')
+ if event_time in ('0.000000', '1970-01-01T00:00:00.000Z',
+ '1970-01-01T00:00:00.000000Z', ''):
+ zero_time_count += 1
+ log.info(f'FOUND zero eventTime: '
+ f'key={event.get("s3", {}).get("object", {}).get("key", "?")} '
+ f'eventTime={event_time}')
+
+ s3_notification_conf.del_config()
+ topic_conf.del_config()
+ for key in bucket.list():
+ key.delete()
+ conn.delete_bucket(bucket_name)
+
+ assert zero_time_count == 0, \
+ f'{zero_time_count} out of {len(parsed_result)} events had zero eventTime'
+
+
def persistent_topic_configs(persistency_time, config_dict):
# create connection with no retries
conn = connection(no_retries=True)