get_alt_email,
get_alt_client,
get_iam_root_client,
+ get_iam_root_s3client,
get_tenant_client,
get_v2_tenant_client,
get_tenant_iam_client,
_bucket_logging_permission_change('Journal')
-def _bucket_logging_tenant_objects(src_client, src_bucket_name, log_client, log_bucket_name, log_type, op_name):
+def _bucket_logging_objects(src_client, src_bucket_name, log_client, log_bucket_name, log_type, op_name):
num_keys = 5
for j in range(num_keys):
name = 'myobject'+str(j)
'LoggingEnabled': logging_enabled,
})
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
- _bucket_logging_tenant_objects(client, src_bucket_name, tenant_client, log_bucket_name, log_type, 'REST.PUT.OBJECT')
+ _bucket_logging_objects(client, src_bucket_name, tenant_client, log_bucket_name, log_type, 'REST.PUT.OBJECT')
# src is on default tenant and log is on a different tenant with the same name
src_bucket_name = get_new_bucket_name()
'LoggingEnabled': logging_enabled,
})
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
- _bucket_logging_tenant_objects(client, src_bucket_name, tenant_client, log_bucket_name, log_type, 'REST.PUT.OBJECT')
+ _bucket_logging_objects(client, src_bucket_name, tenant_client, log_bucket_name, log_type, 'REST.PUT.OBJECT')
try:
'LoggingEnabled': logging_enabled,
})
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
- _bucket_logging_tenant_objects(client, src_bucket_name, client, log_bucket_name, log_type, 'REST.PUT.OBJECT')
+ _bucket_logging_objects(client, src_bucket_name, client, log_bucket_name, log_type, 'REST.PUT.OBJECT')
# src and log are on the same tenant
# log bucket name is set without tenant
'LoggingEnabled': logging_enabled,
})
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
- _bucket_logging_tenant_objects(client, src_bucket_name, client, log_bucket_name, log_type, 'REST.PUT.OBJECT')
+ _bucket_logging_objects(client, src_bucket_name, client, log_bucket_name, log_type, 'REST.PUT.OBJECT')
# src is on tenant and log is on the default tenant
# log bucket name is set with explicit default tenant
'LoggingEnabled': logging_enabled,
})
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
- _bucket_logging_tenant_objects(client, src_bucket_name, log_client, log_bucket_name, log_type, 'REST.PUT.OBJECT')
+ _bucket_logging_objects(client, src_bucket_name, log_client, log_bucket_name, log_type, 'REST.PUT.OBJECT')
try:
# src is on tenant and log is on the default tenant
_put_bucket_logging_tenant('Journal')
+def _put_bucket_logging_account(log_type):
+ # src is default user and log is in an account user
+ src_bucket_name = get_new_bucket_name()
+ src_bucket = get_new_bucket_resource(name=src_bucket_name)
+ log_bucket_name = get_new_bucket_name()
+ log_client = get_iam_root_s3client()
+ log_bucket = get_new_bucket(client=log_client, name=log_bucket_name)
+ client = get_client()
+ prefix = 'log/'
+ _set_log_bucket_policy_tenant(log_client, "", log_bucket_name, "", get_main_user_id(), [src_bucket_name], [prefix])
+ logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': prefix}
+ if log_type == 'Journal':
+ logging_enabled['LoggingType'] = 'Journal'
+ response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={
+ 'LoggingEnabled': logging_enabled,
+ })
+ assert response['ResponseMetadata']['HTTPStatusCode'] == 200
+ _bucket_logging_objects(client, src_bucket_name, log_client, log_bucket_name, log_type, 'REST.PUT.OBJECT')
+
+ # src and log are in an iam account
+ client = get_iam_root_s3client()
+ iam_client = get_iam_root_client()
+ iam_user = iam_client.get_user()['User']['Arn'].split(':')[-2] # Get the IAM user name from ARN
+ src_bucket_name = get_new_bucket_name()
+ src_bucket = get_new_bucket(client=client, name=src_bucket_name)
+ log_bucket_name = get_new_bucket_name()
+ log_bucket = get_new_bucket(client=client, name=log_bucket_name)
+ _set_log_bucket_policy_tenant(client, "", log_bucket_name, "", iam_user, [src_bucket_name], [prefix])
+ logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': prefix}
+ if log_type == 'Journal':
+ logging_enabled['LoggingType'] = 'Journal'
+ response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={
+ 'LoggingEnabled': logging_enabled,
+ })
+ assert response['ResponseMetadata']['HTTPStatusCode'] == 200
+ _bucket_logging_objects(client, src_bucket_name, client, log_bucket_name, log_type, 'REST.PUT.OBJECT')
+
+ # src is in an iam account and log is the default user
+ client = get_iam_root_s3client()
+ iam_client = get_iam_root_client()
+ iam_user = iam_client.get_user()['User']['Arn'].split(':')[-2] # Get the IAM user name from ARN
+ src_bucket_name = get_new_bucket_name()
+ src_bucket = get_new_bucket(client=client, name=src_bucket_name)
+ log_client = get_client()
+ log_bucket_name = get_new_bucket_name()
+ log_bucket = get_new_bucket(client=log_client, name=log_bucket_name)
+ _set_log_bucket_policy_tenant(log_client, "", log_bucket_name, "", iam_user, [src_bucket_name], [prefix])
+ logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': 'log/'}
+ if log_type == 'Journal':
+ logging_enabled['LoggingType'] = 'Journal'
+ response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={
+ 'LoggingEnabled': logging_enabled,
+ })
+ assert response['ResponseMetadata']['HTTPStatusCode'] == 200
+ _bucket_logging_objects(client, src_bucket_name, log_client, log_bucket_name, log_type, 'REST.PUT.OBJECT')
+
+
+@pytest.mark.bucket_logging
+@pytest.mark.fails_on_aws
+def test_put_bucket_logging_account_s():
+ _put_bucket_logging_account('Standard')
+
+
+@pytest.mark.bucket_logging
+@pytest.mark.fails_on_aws
+def test_put_bucket_logging_account_j():
+ if not _has_bucket_logging_extension():
+ pytest.skip('ceph extension to bucket logging not supported at client')
+ _put_bucket_logging_account('Journal')
+
+
@pytest.mark.bucket_logging
def test_rm_bucket_logging():
src_bucket_name = get_new_bucket_name()