]> git.apps.os.sepia.ceph.com Git - s3-tests.git/commitdiff
bucket-logging: tests for the ACLRequired field
authorN Balachandran <nithya.balachandran@ibm.com>
Thu, 3 Jul 2025 10:10:00 +0000 (15:40 +0530)
committerYuval Lifshitz <ylifshit@ibm.com>
Wed, 30 Jul 2025 11:03:11 +0000 (11:03 +0000)
Tests that the AclRequired field is set correctly in the bucket logging record.

Signed-off-by: N Balachandran <nithya.balachandran@ibm.com>
(cherry picked from commit 0616754ea56fbe70f1ad3e703d9905162933c77e)

s3tests_boto3/functional/test_s3.py

index 745e37b0c3b00d32c15e1a5c773a3d36cdb58f8a..852c43fd72e6d9cc3b22c51ed25e5961b6b1f603 100644 (file)
@@ -14894,6 +14894,17 @@ def _verify_records(records, bucket_name, event_type, src_keys, record_type, exp
         return len(keys_found) == expected_count and len(keys_found) == len(all_keys)
     return len(keys_found) == expected_count
 
+def _verify_record_field(records, bucket_name, event_type, object_key, record_type, field_name, expected_value):
+    for record in iter(records.splitlines()):
+        if bucket_name in record and event_type in record and object_key in record:
+            parsed_record = _parse_log_record(record, record_type)
+            logger.info('bucket log record: %s', json.dumps(parsed_record, indent=4))
+            try:
+                value = parsed_record[field_name]
+                return expected_value == value
+            except KeyError:
+                return False
+    return False
 
 def randcontent():
     letters = string.ascii_lowercase
@@ -15287,6 +15298,151 @@ def _bucket_logging_key_filter(log_type):
         assert _verify_records(body, src_bucket_name, 'REST.PUT.OBJECT', names, log_type, expected_count, exact_match=True)
 
 
+@pytest.mark.bucket_logging
+@pytest.mark.fails_on_aws
+def test_bucket_logging_bucket_acl_required():
+    src_bucket_name = get_new_bucket_name()
+    src_bucket = get_new_bucket_resource(name=src_bucket_name)
+    log_bucket_name = get_new_bucket_name()
+    log_bucket = get_new_bucket_resource(name=log_bucket_name)
+    client = get_client()
+    prefix = 'log/'
+    key = 'my-test-object'
+    _set_log_bucket_policy(client, log_bucket_name, [src_bucket_name], [prefix])
+
+    client.put_object(Bucket=src_bucket_name, Key=key, Body=randcontent())
+    client.put_bucket_acl(ACL='public-read',Bucket=src_bucket_name)
+
+    response = client.list_objects_v2(Bucket=src_bucket_name)
+    keys = _get_keys(response)
+
+    # minimal configuration
+    logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': prefix}
+    response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={
+        'LoggingEnabled': logging_enabled,
+    })
+    assert response['ResponseMetadata']['HTTPStatusCode'] == 200
+
+    #This will require ACLs
+    alt_client = get_alt_client()
+    response = alt_client.list_objects_v2(Bucket=src_bucket_name)
+    _flush_logs(client, src_bucket_name)
+
+    response = client.list_objects_v2(Bucket=log_bucket_name)
+    log_keys = _get_keys(response)
+    assert len(log_keys) == 1
+
+    for log_key in log_keys:
+        response = client.get_object(Bucket=log_bucket_name, Key=log_key)
+        body = _get_body(response)
+        assert _verify_records(body, src_bucket_name, 'REST.GET.BUCKET', ["-"], "Standard", 1)
+        assert _verify_record_field(body, src_bucket_name, 'REST.GET.BUCKET', "-", "Standard", "ACLRequired", "Yes")
+
+    #Set a bucket policy that will allow access without requiring ACLs
+    resource1 = "arn:aws:s3:::" + src_bucket_name
+    policy_document = json.dumps(
+    {
+        "Version": "2012-10-17",
+        "Statement": [{
+        "Effect": "Allow",
+        "Principal": {"AWS": "*"},
+        "Action": "s3:ListBucket",
+        "Resource": [
+            "{}".format(resource1),
+          ]
+        }]
+     })
+
+    response = client.put_bucket_policy(Bucket=src_bucket_name, Policy=policy_document)
+    assert response['ResponseMetadata']['HTTPStatusCode'] == 204
+
+    response = alt_client.list_objects_v2(Bucket=src_bucket_name)
+    _flush_logs(client, src_bucket_name)
+
+    response = client.list_objects_v2(Bucket=log_bucket_name)
+    log_keys = _get_keys(response)
+
+    log_key = log_keys[-1]
+    response = client.get_object(Bucket=log_bucket_name, Key=log_key)
+    body = _get_body(response)
+    assert _verify_records(body, src_bucket_name, 'REST.GET.BUCKET', ["-"], "Standard", 1)
+    assert _verify_record_field(body, src_bucket_name, 'REST.GET.BUCKET', "-", "Standard", "ACLRequired", "-")
+
+
+@pytest.mark.bucket_logging
+@pytest.mark.fails_on_aws
+def test_bucket_logging_object_acl_required():
+    src_bucket_name = get_new_bucket_name()
+    src_bucket = get_new_bucket_resource(name=src_bucket_name)
+    log_bucket_name = get_new_bucket_name()
+    log_bucket = get_new_bucket_resource(name=log_bucket_name)
+    client = get_client()
+    prefix = 'log/'
+    key = 'my-test-object'
+    _set_log_bucket_policy(client, log_bucket_name, [src_bucket_name], [prefix])
+
+    client.put_object(Bucket=src_bucket_name, Key=key, Body=randcontent())
+    client.put_object_acl(ACL='public-read',Bucket=src_bucket_name, Key=key)
+
+    response = client.list_objects_v2(Bucket=src_bucket_name)
+    keys = _get_keys(response)
+
+    # minimal configuration
+    logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': prefix}
+    response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={
+        'LoggingEnabled': logging_enabled,
+    })
+    assert response['ResponseMetadata']['HTTPStatusCode'] == 200
+
+    #This will require ACLs
+    alt_client = get_alt_client()
+    response = alt_client.get_object(Bucket=src_bucket_name, Key=key)
+    _flush_logs(client, src_bucket_name)
+
+    response = client.list_objects_v2(Bucket=log_bucket_name)
+    log_keys = _get_keys(response)
+    assert len(log_keys) == 1
+
+    for log_key in log_keys:
+        response = client.get_object(Bucket=log_bucket_name, Key=log_key)
+        body = _get_body(response)
+        assert _verify_records(body, src_bucket_name, 'REST.GET.OBJECT', [key], "Standard", 1)
+        assert _verify_record_field(body, src_bucket_name, 'REST.GET.OBJECT', key, "Standard", "ACLRequired", "Yes")
+
+    #Set a bucket policy that will allow access to the object without requiring ACLs
+    resource1 = "arn:aws:s3:::" + src_bucket_name
+    resource2 = "arn:aws:s3:::" + src_bucket_name + "/*"
+    policy_document = json.dumps(
+    {
+        "Version": "2012-10-17",
+        "Statement": [{
+        "Effect": "Allow",
+        "Principal": {"AWS": "*"},
+        "Action": "s3:GetObject",
+        "Resource": [
+            "{}".format(resource1),
+            "{}".format(resource2)
+          ]
+        }]
+     })
+
+    response = client.put_bucket_policy(Bucket=src_bucket_name, Policy=policy_document)
+    assert response['ResponseMetadata']['HTTPStatusCode'] == 204
+
+    response = alt_client.get_object(Bucket=src_bucket_name, Key=key)
+    _flush_logs(client, src_bucket_name)
+
+    response = client.list_objects_v2(Bucket=log_bucket_name)
+    log_keys = _get_keys(response)
+
+    log_key = log_keys[-1]
+    response = client.get_object(Bucket=log_bucket_name, Key=log_key)
+    body = _get_body(response)
+    assert _verify_records(body, src_bucket_name, 'REST.GET.OBJECT', [key], "Standard", 1)
+    assert _verify_record_field(body, src_bucket_name, 'REST.GET.OBJECT', key, "Standard", "ACLRequired", "-")
+
+
+
 @pytest.mark.bucket_logging
 @pytest.mark.fails_on_aws
 def test_bucket_logging_key_filter_s():