return len(keys_found) == expected_count and len(keys_found) == len(all_keys)
return len(keys_found) == expected_count
+def _verify_record_field(records, bucket_name, event_type, object_key, record_type, field_name, expected_value):
+ for record in iter(records.splitlines()):
+ if bucket_name in record and event_type in record and object_key in record:
+ parsed_record = _parse_log_record(record, record_type)
+ logger.info('bucket log record: %s', json.dumps(parsed_record, indent=4))
+ try:
+ value = parsed_record[field_name]
+ return expected_value == value
+ except KeyError:
+ return False
+ return False
def randcontent():
letters = string.ascii_lowercase
assert _verify_records(body, src_bucket_name, 'REST.PUT.OBJECT', names, log_type, expected_count, exact_match=True)
+@pytest.mark.bucket_logging
+@pytest.mark.fails_on_aws
+def test_bucket_logging_bucket_acl_required():
+ src_bucket_name = get_new_bucket_name()
+ src_bucket = get_new_bucket_resource(name=src_bucket_name)
+ log_bucket_name = get_new_bucket_name()
+ log_bucket = get_new_bucket_resource(name=log_bucket_name)
+ client = get_client()
+ prefix = 'log/'
+ key = 'my-test-object'
+ _set_log_bucket_policy(client, log_bucket_name, [src_bucket_name], [prefix])
+
+ client.put_object(Bucket=src_bucket_name, Key=key, Body=randcontent())
+ client.put_bucket_acl(ACL='public-read',Bucket=src_bucket_name)
+
+ response = client.list_objects_v2(Bucket=src_bucket_name)
+ keys = _get_keys(response)
+
+ # minimal configuration
+ logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': prefix}
+ response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={
+ 'LoggingEnabled': logging_enabled,
+ })
+ assert response['ResponseMetadata']['HTTPStatusCode'] == 200
+
+ #This will require ACLs
+ alt_client = get_alt_client()
+ response = alt_client.list_objects_v2(Bucket=src_bucket_name)
+ _flush_logs(client, src_bucket_name)
+
+ response = client.list_objects_v2(Bucket=log_bucket_name)
+ log_keys = _get_keys(response)
+ assert len(log_keys) == 1
+
+ for log_key in log_keys:
+ response = client.get_object(Bucket=log_bucket_name, Key=log_key)
+ body = _get_body(response)
+ assert _verify_records(body, src_bucket_name, 'REST.GET.BUCKET', ["-"], "Standard", 1)
+ assert _verify_record_field(body, src_bucket_name, 'REST.GET.BUCKET', "-", "Standard", "ACLRequired", "Yes")
+
+ #Set a bucket policy that will allow access without requiring ACLs
+ resource1 = "arn:aws:s3:::" + src_bucket_name
+ policy_document = json.dumps(
+ {
+ "Version": "2012-10-17",
+ "Statement": [{
+ "Effect": "Allow",
+ "Principal": {"AWS": "*"},
+ "Action": "s3:ListBucket",
+ "Resource": [
+ "{}".format(resource1),
+ ]
+ }]
+ })
+
+ response = client.put_bucket_policy(Bucket=src_bucket_name, Policy=policy_document)
+ assert response['ResponseMetadata']['HTTPStatusCode'] == 204
+
+ response = alt_client.list_objects_v2(Bucket=src_bucket_name)
+ _flush_logs(client, src_bucket_name)
+
+ response = client.list_objects_v2(Bucket=log_bucket_name)
+ log_keys = _get_keys(response)
+
+ log_key = log_keys[-1]
+ response = client.get_object(Bucket=log_bucket_name, Key=log_key)
+ body = _get_body(response)
+ assert _verify_records(body, src_bucket_name, 'REST.GET.BUCKET', ["-"], "Standard", 1)
+ assert _verify_record_field(body, src_bucket_name, 'REST.GET.BUCKET', "-", "Standard", "ACLRequired", "-")
+
+
+@pytest.mark.bucket_logging
+@pytest.mark.fails_on_aws
+def test_bucket_logging_object_acl_required():
+ src_bucket_name = get_new_bucket_name()
+ src_bucket = get_new_bucket_resource(name=src_bucket_name)
+ log_bucket_name = get_new_bucket_name()
+ log_bucket = get_new_bucket_resource(name=log_bucket_name)
+ client = get_client()
+ prefix = 'log/'
+ key = 'my-test-object'
+ _set_log_bucket_policy(client, log_bucket_name, [src_bucket_name], [prefix])
+
+ client.put_object(Bucket=src_bucket_name, Key=key, Body=randcontent())
+ client.put_object_acl(ACL='public-read',Bucket=src_bucket_name, Key=key)
+
+ response = client.list_objects_v2(Bucket=src_bucket_name)
+ keys = _get_keys(response)
+
+ # minimal configuration
+ logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': prefix}
+ response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={
+ 'LoggingEnabled': logging_enabled,
+ })
+ assert response['ResponseMetadata']['HTTPStatusCode'] == 200
+
+ #This will require ACLs
+ alt_client = get_alt_client()
+ response = alt_client.get_object(Bucket=src_bucket_name, Key=key)
+ _flush_logs(client, src_bucket_name)
+
+ response = client.list_objects_v2(Bucket=log_bucket_name)
+ log_keys = _get_keys(response)
+ assert len(log_keys) == 1
+
+ for log_key in log_keys:
+ response = client.get_object(Bucket=log_bucket_name, Key=log_key)
+ body = _get_body(response)
+ assert _verify_records(body, src_bucket_name, 'REST.GET.OBJECT', [key], "Standard", 1)
+ assert _verify_record_field(body, src_bucket_name, 'REST.GET.OBJECT', key, "Standard", "ACLRequired", "Yes")
+
+ #Set a bucket policy that will allow access to the object without requiring ACLs
+ resource1 = "arn:aws:s3:::" + src_bucket_name
+ resource2 = "arn:aws:s3:::" + src_bucket_name + "/*"
+ policy_document = json.dumps(
+ {
+ "Version": "2012-10-17",
+ "Statement": [{
+ "Effect": "Allow",
+ "Principal": {"AWS": "*"},
+ "Action": "s3:GetObject",
+ "Resource": [
+ "{}".format(resource1),
+ "{}".format(resource2)
+ ]
+ }]
+ })
+
+ response = client.put_bucket_policy(Bucket=src_bucket_name, Policy=policy_document)
+ assert response['ResponseMetadata']['HTTPStatusCode'] == 204
+
+ response = alt_client.get_object(Bucket=src_bucket_name, Key=key)
+ _flush_logs(client, src_bucket_name)
+
+ response = client.list_objects_v2(Bucket=log_bucket_name)
+ log_keys = _get_keys(response)
+
+ log_key = log_keys[-1]
+ response = client.get_object(Bucket=log_bucket_name, Key=log_key)
+ body = _get_body(response)
+ assert _verify_records(body, src_bucket_name, 'REST.GET.OBJECT', [key], "Standard", 1)
+ assert _verify_record_field(body, src_bucket_name, 'REST.GET.OBJECT', key, "Standard", "ACLRequired", "-")
+
+
+
@pytest.mark.bucket_logging
@pytest.mark.fails_on_aws
def test_bucket_logging_key_filter_s():