From: Krunal Chheda Date: Wed, 20 May 2026 18:14:22 +0000 (-0400) Subject: rgw/notification: fix zero eventTime in bucket notifications on concurrent PUT race X-Git-Tag: v21.0.1~25^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=0064cc00eeb2911c4b9cc417a34b026a70e9d1ad;p=ceph.git rgw/notification: fix zero eventTime in bucket notifications on concurrent PUT race When concurrent PUTs target the same object key, RADOS may return -ECANCELED to the losing writers. In that path *meta.mtime was never populated from meta.set_mtime, leaving mtime at epoch (zero), which propagated into bucket notification eventTime as "1970-01-01T00:00:00.000Z". Fix: set *meta.mtime from meta.set_mtime before returning 0 in the ECANCELED/ENOENT/EEXIST early-return block, matching the behaviour of the successful write path. Also add a regression test that fires 20 concurrent threads writing the same key and asserts no event in the persistent queue carries a zero eventTime. Fixes: https://tracker.ceph.com/issues/76729 Signed-off-by: Krunal Chheda --- diff --git a/src/rgw/driver/rados/rgw_rados.cc b/src/rgw/driver/rados/rgw_rados.cc index 936a1109b55..ed1b72313d7 100644 --- a/src/rgw/driver/rados/rgw_rados.cc +++ b/src/rgw/driver/rados/rgw_rados.cc @@ -3610,6 +3610,9 @@ done_cancel: */ if (meta.if_match == NULL && meta.if_nomatch == NULL) { if (r == -ECANCELED || r == -ENOENT || r == -EEXIST) { + if (meta.mtime) { + *meta.mtime = meta.set_mtime; + } r = 0; } } else { diff --git a/src/test/rgw/bucket_notification/test_bn.py b/src/test/rgw/bucket_notification/test_bn.py index 459211af2c3..fa5fa5b941e 100644 --- a/src/test/rgw/bucket_notification/test_bn.py +++ b/src/test/rgw/bucket_notification/test_bn.py @@ -3505,6 +3505,79 @@ def test_persistent_topic_dump(): receiver.close(task) +@pytest.mark.basic_test +def test_ps_s3_notification_concurrent_put_eventtime(): + """ test that eventTime is never zero when concurrent PUTs race on same key """ + conn = connection() + zonegroup = get_config_zonegroup() + + host = get_ip() + port = random.randint(10000, 20000) + + bucket_name = gen_bucket_name() + bucket = conn.create_bucket(bucket_name) + topic_name = bucket_name + TOPIC_SUFFIX + + # persistent topic with unreachable endpoint so events stay in queue + endpoint_address = 'http://' + host + ':' + str(port) + endpoint_args = 'push-endpoint=' + endpoint_address + '&persistent=true' + \ + '&retry_sleep_duration=100' + topic_conf = PSTopicS3(conn, topic_name, zonegroup, + endpoint_args=endpoint_args) + topic_arn = topic_conf.set_config() + + notification_name = bucket_name + NOTIFICATION_SUFFIX + topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn, + 'Events': ['s3:ObjectCreated:*']}] + s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) + response, status = s3_notification_conf.set_config() + assert status // 100 == 2 + + # concurrent PUTs to the SAME key to trigger RADOS-level ECANCELED race + num_threads = 20 + num_rounds = 5 + key_name = 'race-target' + + for round_num in range(num_rounds): + client_threads = [] + for i in range(num_threads): + key = bucket.new_key(key_name) + content = str(os.urandom(128)) + str(round_num) + str(i) + thr = threading.Thread(target=set_contents_from_string, + args=(key, content,)) + thr.start() + client_threads.append(thr) + [thr.join() for thr in client_threads] + + time.sleep(2) + result = admin(['topic', 'dump', '--topic', topic_name], + get_config_cluster()) + assert result[1] == 0 + parsed_result = json.loads(result[0]) + log.info(f'topic dump has {len(parsed_result)} events') + assert len(parsed_result) > 0, 'expected at least one notification event in queue' + + zero_time_count = 0 + for entry in parsed_result: + event = entry.get('entry', {}).get('event', {}) + event_time = event.get('eventTime', '') + if event_time in ('0.000000', '1970-01-01T00:00:00.000Z', + '1970-01-01T00:00:00.000000Z', ''): + zero_time_count += 1 + log.info(f'FOUND zero eventTime: ' + f'key={event.get("s3", {}).get("object", {}).get("key", "?")} ' + f'eventTime={event_time}') + + s3_notification_conf.del_config() + topic_conf.del_config() + for key in bucket.list(): + key.delete() + conn.delete_bucket(bucket_name) + + assert zero_time_count == 0, \ + f'{zero_time_count} out of {len(parsed_result)} events had zero eventTime' + + def persistent_topic_configs(persistency_time, config_dict): # create connection with no retries conn = connection(no_retries=True)