new_num_shards = random.randint(2, 10)
default_num_shards = 11
persistent_notification_shard_config_change('kafka', conn, new_num_shards, default_num_shards)
+
+
+@pytest.mark.basic_test
+def test_ps_s3_x_amz_request_id_on_master():
+ """ test that the x-amz-request-id in the notification event matches
+ the x-amz-request-id returned in the S3 put_object and delete_object responses.
+ Uses a persistent topic with a wrong endpoint so events stay in the queue,
+ then uses 'topic dump' to inspect the event entries. """
+ conn = connection()
+ zonegroup = get_config_zonegroup()
+
+ # create bucket
+ bucket_name = gen_bucket_name()
+ bucket = conn.create_bucket(bucket_name)
+ topic_name = bucket_name + TOPIC_SUFFIX
+
+ # create s3 topic with a wrong endpoint so events are queued and not delivered
+ endpoint_address = 'http://WrongHost:1234'
+ endpoint_args = 'push-endpoint=' + endpoint_address + '&persistent=true'
+ topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
+ topic_arn = topic_conf.set_config()
+
+ # create s3 notification for creation and deletion events
+ notification_name = bucket_name + NOTIFICATION_SUFFIX
+ topic_conf_list = [{
+ 'Id': notification_name,
+ 'TopicArn': topic_arn,
+ 'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*']
+ }]
+ s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
+ _, status = s3_notification_conf.set_config()
+ assert status // 100 == 2
+
+ # use boto3 client for S3 operations to capture x-amz-request-id from responses
+ client = boto3.client(
+ 's3',
+ endpoint_url='http://' + conn.host + ':' + str(conn.port),
+ aws_access_key_id=conn.aws_access_key_id,
+ aws_secret_access_key=conn.aws_secret_access_key,
+ )
+
+ # put object and capture x-amz-request-id from the response
+ key_name = 'test_request_id_key'
+ put_response = client.put_object(Bucket=bucket_name, Key=key_name, Body=b'hello world')
+ put_request_id = put_response['ResponseMetadata']['HTTPHeaders'].get('x-amz-request-id', '')
+ assert put_request_id, "put_object response missing x-amz-request-id"
+
+ # delete object and capture x-amz-request-id from the response
+ delete_response = client.delete_object(Bucket=bucket_name, Key=key_name)
+ delete_request_id = delete_response['ResponseMetadata']['HTTPHeaders'].get('x-amz-request-id', '')
+ assert delete_request_id, "delete_object response missing x-amz-request-id"
+
+ # the put and delete request ids should be different
+ assert put_request_id != delete_request_id
+
+ # wait for events to be queued
+ time.sleep(5)
+
+ # dump the persistent topic entries and verify x-amz-request-id
+ result = admin(['topic', 'dump', '--topic', topic_name], get_config_cluster())
+ assert result[1] == 0
+ parsed_result = json.loads(result[0])
+ assert len(parsed_result) == 2
+
+ # find creation and deletion events from the dump
+ # note: topic dump eventName does not have the 's3:' prefix
+ creation_event = None
+ deletion_event = None
+ for entry in parsed_result:
+ event = entry['entry']['event']
+ if event['eventName'].startswith('ObjectCreated'):
+ creation_event = event
+ elif event['eventName'].startswith('ObjectRemoved'):
+ deletion_event = event
+
+ assert creation_event is not None, "creation event not found in topic dump"
+ assert deletion_event is not None, "deletion event not found in topic dump"
+
+ # verify creation event x-amz-request-id matches the put_object response
+ assert creation_event['responseElements']['x-amz-request-id'] == put_request_id
+
+ # verify deletion event x-amz-request-id matches the delete_object response
+ assert deletion_event['responseElements']['x-amz-request-id'] == delete_request_id
+
+ # cleanup
+ s3_notification_conf.del_config()
+ topic_conf.del_config()
+ conn.delete_bucket(bucket_name)