]> git-server-git.apps.pok.os.sepia.ceph.com Git - s3-tests.git/commitdiff
rgw/logging: add basic policy to target bucket
authorYuval Lifshitz <ylifshit@ibm.com>
Wed, 26 Mar 2025 13:52:16 +0000 (13:52 +0000)
committerYuval Lifshitz <ylifshit@ibm.com>
Thu, 27 Mar 2025 12:25:53 +0000 (12:25 +0000)
this commit needed to be able to run bucket logging regression
against: https://github.com/ceph/ceph/pull/62284
since target bucket requires policy for bucket logging to work
this only covers the positive cases from bucket logging policy
perspective

Signed-off-by: Yuval Lifshitz <ylifshit@ibm.com>
s3tests_boto3/functional/test_s3.py

index 978dc1957347d5f43b1463237004cfd57841d08f..19c23ba6b61cb6f84cfce9c64a091252f7ff71db 100644 (file)
@@ -14473,35 +14473,75 @@ def test_post_object_upload_checksum():
     r = requests.post(url, files=payload, verify=get_config_ssl_verify())
     assert r.status_code == 400
 
+
+def _set_log_bucket_policy_tenant(client, log_tenant, log_bucket_name, src_tenant, src_user, src_buckets, log_prefixes):
+    statements = []
+    for j in range(len(src_buckets)):
+        if len(log_prefixes) == 1:
+            prefix = log_prefixes[0]
+        else:
+            prefix = log_prefixes[j]
+        statements.append({
+            "Sid": "S3ServerAccessLogsPolicy",
+            "Effect": "Allow",
+            "Principal": {"Service": "logging.s3.amazonaws.com"},
+            "Action": ["s3:PutObject"],
+            "Resource": "arn:aws:s3::{}:{}/{}".format(log_tenant, log_bucket_name, prefix),
+            "Condition": {
+                "ArnLike": {"aws:SourceArn": "arn:aws:s3::{}:{}".format(src_tenant, src_buckets[j])},
+                "StringEquals": {
+                    "aws:SourceAccount": "{}${}".format(src_tenant, src_user) if src_tenant else src_user
+                    }
+            }
+        })
+
+    policy_document = json.dumps({
+        "Version": "2012-10-17",
+        "Statement": statements
+        })
+
+    result = client.put_bucket_policy(Bucket=log_bucket_name, Policy=policy_document)
+    assert(result['ResponseMetadata']['HTTPStatusCode'] == 204)
+
+
+def _set_log_bucket_policy(client, log_bucket_name, src_buckets, log_prefixes):
+    _set_log_bucket_policy_tenant(client, "", log_bucket_name, "", get_main_user_id(), src_buckets, log_prefixes)
+
+
 def _has_bucket_logging_extension():
     src_bucket_name = get_new_bucket_name()
-    src_bucket = get_new_bucket_resource(name=src_bucket_name)
     log_bucket_name = get_new_bucket_name()
-    log_bucket = get_new_bucket_resource(name=log_bucket_name)
     client = get_client()
-    logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': 'log/', 'LoggingType': 'Journal'}
+    log_prefix = 'log/'
+    logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': log_prefix, 'LoggingType': 'Journal'}
     try:
         response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={
             'LoggingEnabled': logging_enabled,
         })
     except ParamValidationError as e:
         return False
+    except ClientError as e:
+        # should fail on non-existing bucket
+        return True
+
     return True
 
+
 def _has_taget_object_key_format():
     src_bucket_name = get_new_bucket_name()
-    src_bucket = get_new_bucket_resource(name=src_bucket_name)
     log_bucket_name = get_new_bucket_name()
-    log_bucket = get_new_bucket_resource(name=log_bucket_name)
     client = get_client()
-    logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': 'log/', 'TargetObjectKeyFormat': {'SimplePrefix': {}}}
+    log_prefix = 'log/'
+    logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': log_prefix, 'TargetObjectKeyFormat': {'SimplePrefix': {}}}
     try:
         response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={
             'LoggingEnabled': logging_enabled,
         })
     except ParamValidationError as e:
         return False
-    return True
+    finally:
+        # should fail on non-existing bucket
+        return True
 
 
 import shlex
@@ -14597,18 +14637,19 @@ def randcontent():
 
 @pytest.mark.bucket_logging
 def test_put_bucket_logging():
+    has_extensions = _has_bucket_logging_extension()
+    has_key_format = _has_taget_object_key_format()
     src_bucket_name = get_new_bucket_name()
     src_bucket = get_new_bucket_resource(name=src_bucket_name)
     log_bucket_name = get_new_bucket_name()
     log_bucket = get_new_bucket_resource(name=log_bucket_name)
     client = get_client()
-    has_extensions = _has_bucket_logging_extension()
-    has_key_format = _has_taget_object_key_format()
-    
+    prefix = 'log/'
+    _set_log_bucket_policy(client, log_bucket_name, [src_bucket_name], [prefix])
     # minimal configuration
     logging_enabled = {
-            'TargetBucket': log_bucket_name, 
-            'TargetPrefix': 'log/'
+            'TargetBucket': log_bucket_name,
+            'TargetPrefix': prefix
             }
 
     if has_extensions:
@@ -14626,11 +14667,11 @@ def test_put_bucket_logging():
         # default value for key prefix is returned
         logging_enabled['TargetObjectKeyFormat'] = {'SimplePrefix': {}}
     assert response['LoggingEnabled'] == logging_enabled
+
     if has_key_format:
         # with simple target object prefix
         logging_enabled = {
-            'TargetBucket': log_bucket_name, 
+            'TargetBucket': log_bucket_name,
             'TargetPrefix': 'log/',
             'TargetObjectKeyFormat': {
                 'SimplePrefix': {}
@@ -14648,10 +14689,10 @@ def test_put_bucket_logging():
             logging_enabled['LoggingType'] = 'Standard'
             logging_enabled['RecordsBatchSize'] = 0
         assert response['LoggingEnabled'] == logging_enabled
-        
+
         # with partitioned target object prefix
         logging_enabled = {
-            'TargetBucket': log_bucket_name, 
+            'TargetBucket': log_bucket_name,
             'TargetPrefix': 'log/',
             'TargetObjectKeyFormat': {
                 'PartitionedPrefix': {
@@ -14671,12 +14712,12 @@ def test_put_bucket_logging():
             logging_enabled['LoggingType'] = 'Standard'
             logging_enabled['RecordsBatchSize'] = 0
         assert response['LoggingEnabled'] == logging_enabled
-        
+
     # with target grant (not implemented in RGW)
     main_display_name = get_main_display_name()
     main_user_id = get_main_user_id()
     logging_enabled = {
-        'TargetBucket': log_bucket_name, 
+        'TargetBucket': log_bucket_name,
         'TargetPrefix': 'log/',
         'TargetGrants': [{'Grantee': {'DisplayName': main_display_name, 'ID': main_user_id,'Type': 'CanonicalUser'},'Permission': 'FULL_CONTROL'}] 
     }
@@ -14713,15 +14754,17 @@ def _bucket_logging_key_filter(log_type):
     log_bucket_name = get_new_bucket_name()
     log_bucket = get_new_bucket_resource(name=log_bucket_name)
     client = get_client()
-    
+    prefix = 'log/'
+    _set_log_bucket_policy(client, log_bucket_name, [src_bucket_name], [prefix])
+
     logging_enabled = {
-            'TargetBucket': log_bucket_name, 
+            'TargetBucket': log_bucket_name,
             'LoggingType': log_type,
-            'TargetPrefix': 'log/',
+            'TargetPrefix': prefix,
             'ObjectRollTime': expected_object_roll_time,
             'TargetObjectKeyFormat': {'SimplePrefix': {}},
             'RecordsBatchSize': 0,
-             'Filter': 
+             'Filter':
                 {
                     'Key': {
                         'FilterRules': [
@@ -14745,16 +14788,16 @@ def _bucket_logging_key_filter(log_type):
         print('TODO')
     else:
         assert False, 'unknown log type: %s' % log_type
-    
+
     names = []
     num_keys = 5
     for j in range(num_keys):
-        name = 'myobject'+str(j)    
+        name = 'myobject'+str(j)
         if log_type == 'Standard':
             # standard log records are not filtered
             names.append(name)
         client.put_object(Bucket=src_bucket_name, Key=name, Body=randcontent())
-   
+
     for j in range(num_keys):
         name = 'test/'+'myobject'+str(j)+'.txt'
         names.append(name)
@@ -14797,25 +14840,33 @@ def _bucket_logging_flush(logging_type, single_prefix, concurrency):
     log_bucket_name = get_new_bucket_name()
     log_bucket = get_new_bucket_resource(name=log_bucket_name)
     client = get_client()
-   
+
     num_buckets = 5
     buckets = []
+    log_prefixes = []
     longer_time = expected_object_roll_time*10
     for j in range(num_buckets):
         src_bucket_name = get_new_bucket_name()
         src_bucket = get_new_bucket_resource(name=src_bucket_name)
-        logging_enabled = {'TargetBucket': log_bucket_name, 
-                           'ObjectRollTime': longer_time, 'LoggingType': logging_type}
+        buckets.append(src_bucket_name)
         if single_prefix:
-            logging_enabled['TargetPrefix'] = 'log/'
+            log_prefixes.append('log/')
         else:
-            logging_enabled['TargetPrefix'] = src_bucket_name+'/'
+            log_prefixes.append(src_bucket_name+'/')
 
-        response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={
+    _set_log_bucket_policy(client, log_bucket_name, buckets, log_prefixes)
+
+    for j in range(num_buckets):
+        logging_enabled = {'TargetBucket': log_bucket_name,
+                           'ObjectRollTime': longer_time,
+                           'LoggingType': logging_type,
+                           'TargetPrefix': log_prefixes[j]
+                           }
+
+        response = client.put_bucket_logging(Bucket=buckets[j], BucketLoggingStatus={
             'LoggingEnabled': logging_enabled,
         })
         assert response['ResponseMetadata']['HTTPStatusCode'] == 200
-        buckets.append(src_bucket_name)
 
     num_keys = 10
     src_names = []
@@ -14834,7 +14885,7 @@ def _bucket_logging_flush(logging_type, single_prefix, concurrency):
     t = []
     for src_bucket_name in buckets:
         if concurrency:
-            thr = threading.Thread(target = client.post_bucket_logging, 
+            thr = threading.Thread(target = client.post_bucket_logging,
                                kwargs={'Bucket': src_bucket_name})
             thr.start()
             t.append(thr)
@@ -14845,7 +14896,7 @@ def _bucket_logging_flush(logging_type, single_prefix, concurrency):
             # because flushing itself will be logged
             # and the next flush will commit the log
             break
-    
+
     _do_wait_completion(t)
 
     response = client.list_objects_v2(Bucket=log_bucket_name)
@@ -14860,15 +14911,12 @@ def _bucket_logging_flush(logging_type, single_prefix, concurrency):
         response = client.get_object(Bucket=log_bucket_name, Key=key)
         body = _get_body(response)
         found = False
-        for src_bucket_name in buckets:
-            if single_prefix:
-                prefix = 'log/'
-            else:
-                prefix = src_bucket_name+'/'
+        for j in range(num_buckets):
+            prefix = log_prefixes[j]
             if key.startswith(prefix):
                 found = True
-                assert _verify_records(body, src_bucket_name, 'REST.PUT.OBJECT', src_names, logging_type, num_keys)
-                assert _verify_records(body, src_bucket_name, 'REST.DELETE.OBJECT', src_names, logging_type, num_keys)
+                assert _verify_records(body, buckets[j], 'REST.PUT.OBJECT', src_names, logging_type, num_keys)
+                assert _verify_records(body, buckets[j], 'REST.DELETE.OBJECT', src_names, logging_type, num_keys)
         assert found
 
 
@@ -14927,45 +14975,48 @@ def test_put_bucket_logging_errors():
     log_bucket_name1 = get_new_bucket_name()
     log_bucket1 = get_new_bucket_resource(name=log_bucket_name1)
     client = get_client()
-    
+    prefix = 'log/'
+
     # invalid source bucket
     try:
         response = client.put_bucket_logging(Bucket=src_bucket_name+'kaboom', BucketLoggingStatus={
-            'LoggingEnabled': {'TargetBucket': log_bucket_name1, 'TargetPrefix': 'log/'},
+            'LoggingEnabled': {'TargetBucket': log_bucket_name1, 'TargetPrefix': prefix},
         })
         assert False, 'expected failure'
     except ClientError as e:
         assert e.response['Error']['Code'] == 'NoSuchBucket'
-    
+
     # invalid log bucket
     try:
         response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={
-            'LoggingEnabled': {'TargetBucket': log_bucket_name1+'kaboom', 'TargetPrefix': 'log/'},
+            'LoggingEnabled': {'TargetBucket': log_bucket_name1+'kaboom', 'TargetPrefix': prefix},
         })
         assert False, 'expected failure'
     except ClientError as e:
         assert e.response['Error']['Code'] == 'NoSuchKey'
-    
+
     # log bucket has bucket logging
     log_bucket_name2 = get_new_bucket_name()
     log_bucket2 = get_new_bucket_resource(name=log_bucket_name2)
+    _set_log_bucket_policy(client, log_bucket_name1, [log_bucket_name2], [prefix])
     response = client.put_bucket_logging(Bucket=log_bucket_name2, BucketLoggingStatus={
-        'LoggingEnabled': {'TargetBucket': log_bucket_name1, 'TargetPrefix': 'log/'},
+        'LoggingEnabled': {'TargetBucket': log_bucket_name1, 'TargetPrefix': prefix},
     })
     assert response['ResponseMetadata']['HTTPStatusCode'] == 200
+    _set_log_bucket_policy(client, log_bucket_name2, [src_bucket_name], [prefix])
     try:
         response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={
-            'LoggingEnabled': {'TargetBucket': log_bucket_name2, 'TargetPrefix': 'log/'},
+            'LoggingEnabled': {'TargetBucket': log_bucket_name2, 'TargetPrefix': prefix},
         })
         assert False, 'expected failure'
     except ClientError as e:
         assert e.response['Error']['Code'] == 'InvalidArgument'
-    
+
     # invalid partition prefix
     if _has_taget_object_key_format():
         logging_enabled = {
-            'TargetBucket': log_bucket_name1, 
-            'TargetPrefix': 'log/',
+            'TargetBucket': log_bucket_name1,
+            'TargetPrefix': prefix,
             'TargetObjectKeyFormat': {
                 'PartitionedPrefix': {
                     'PartitionDateSource': 'kaboom'
@@ -14973,6 +15024,7 @@ def test_put_bucket_logging_errors():
             }
         }
         try:
+            _set_log_bucket_policy(client, log_bucket_name1, [src_bucket_name], [prefix])
             response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={
                 'LoggingEnabled': logging_enabled,
             })
@@ -14981,19 +15033,20 @@ def test_put_bucket_logging_errors():
             assert e.response['Error']['Code'] == 'MalformedXML'
 
     # log bucket is the same as source bucket
+    _set_log_bucket_policy(client, src_bucket_name, [src_bucket_name], [prefix])
     try:
         response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={
-            'LoggingEnabled': {'TargetBucket': src_bucket_name, 'TargetPrefix': 'log/'},
+            'LoggingEnabled': {'TargetBucket': src_bucket_name, 'TargetPrefix': prefix},
         })
         assert False, 'expected failure'
     except ClientError as e:
         assert e.response['Error']['Code'] == 'InvalidArgument'
-    
+
     # log bucket is encrypted
     _put_bucket_encryption_s3(client, log_bucket_name1)
     try:
         response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={
-            'LoggingEnabled': {'TargetBucket': log_bucket_name1, 'TargetPrefix': 'log/'},
+            'LoggingEnabled': {'TargetBucket': log_bucket_name1, 'TargetPrefix': prefix},
         })
         assert False, 'expected failure'
     except ClientError as e:
@@ -15003,7 +15056,7 @@ def test_put_bucket_logging_errors():
     if _has_bucket_logging_extension():
         try:
             response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={
-                'LoggingEnabled': {'TargetBucket': log_bucket_name1, 'TargetPrefix': 'log/', 'LoggingType': 'kaboom'},
+                'LoggingEnabled': {'TargetBucket': log_bucket_name1, 'TargetPrefix': prefix, 'LoggingType': 'kaboom'},
             })
             assert False, 'expected failure'
         except ClientError as e:
@@ -15013,7 +15066,7 @@ def test_put_bucket_logging_errors():
 def _bucket_logging_tenant_objects(src_client, src_bucket_name, log_client, log_bucket_name, log_type, op_name):
     num_keys = 5
     for j in range(num_keys):
-        name = 'myobject'+str(j)    
+        name = 'myobject'+str(j)
         src_client.put_object(Bucket=src_bucket_name, Key=name, Body=randcontent())
 
     expected_count = num_keys
@@ -15042,7 +15095,10 @@ def _put_bucket_logging_tenant(log_type):
     tenant_client = get_tenant_client()
     log_bucket = get_new_bucket(client=tenant_client, name=log_bucket_name)
     client = get_client()
-    logging_enabled = {'TargetBucket': get_tenant_name()+':'+log_bucket_name, 'TargetPrefix': 'log/'}
+    prefix = 'log/'
+    log_tenant = get_tenant_name()
+    _set_log_bucket_policy_tenant(tenant_client, log_tenant, log_bucket_name, "", get_main_user_id(), [src_bucket_name], [prefix])
+    logging_enabled = {'TargetBucket': log_tenant+':'+log_bucket_name, 'TargetPrefix': prefix}
     if log_type == 'Journal':
         logging_enabled['LoggingType'] = 'Journal'
     response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={
@@ -15051,7 +15107,6 @@ def _put_bucket_logging_tenant(log_type):
     assert response['ResponseMetadata']['HTTPStatusCode'] == 200
     _bucket_logging_tenant_objects(client, src_bucket_name, tenant_client, log_bucket_name, log_type, 'REST.PUT.OBJECT')
 
-
     # src is on default tenant and log is on a different tenant with the same name
     src_bucket_name = get_new_bucket_name()
     src_bucket = get_new_bucket_resource(name=src_bucket_name)
@@ -15059,7 +15114,9 @@ def _put_bucket_logging_tenant(log_type):
     tenant_client = get_tenant_client()
     log_bucket = get_new_bucket(client=tenant_client, name=log_bucket_name)
     client = get_client()
-    logging_enabled = {'TargetBucket': get_tenant_name()+':'+log_bucket_name, 'TargetPrefix': 'log/'}
+    tenant_name = get_tenant_name()
+    _set_log_bucket_policy_tenant(tenant_client, tenant_name, log_bucket_name, "", get_main_user_id(),  [src_bucket_name], [prefix])
+    logging_enabled = {'TargetBucket': tenant_name+':'+log_bucket_name, 'TargetPrefix': prefix}
     if log_type == 'Journal':
         logging_enabled['LoggingType'] = 'Journal'
     response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={
@@ -15075,9 +15132,11 @@ def _put_bucket_logging_tenant(log_type):
         src_bucket_name = get_new_bucket_name()
         src_bucket = get_new_bucket_resource(name=src_bucket_name)
         log_bucket_name = get_new_bucket_name()
-        log_bucket = get_new_bucket(client=get_tenant_client(), name=log_bucket_name)
+        log_client = get_tenant_client()
+        log_bucket = get_new_bucket(client=log_client, name=log_bucket_name)
         client = get_client()
-        logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': 'log/'}
+        _set_log_bucket_policy_tenant(log_client, get_tenant_name(), log_bucket_name, "", get_main_user_id(), [src_bucket_name], [prefix])
+        logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': prefix}
         if log_type == 'Journal':
             logging_enabled['LoggingType'] = 'Journal'
         response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={
@@ -15095,7 +15154,9 @@ def _put_bucket_logging_tenant(log_type):
     src_bucket = get_new_bucket(client=client, name=src_bucket_name)
     log_bucket_name = get_new_bucket_name()
     log_bucket = get_new_bucket(client=client, name=log_bucket_name)
-    logging_enabled = {'TargetBucket': get_tenant_name()+':'+log_bucket_name, 'TargetPrefix': 'log/'}
+    tenant_name = get_tenant_name()
+    _set_log_bucket_policy_tenant(client, tenant_name, log_bucket_name, tenant_name, get_tenant_user_id(), [src_bucket_name], [prefix])
+    logging_enabled = {'TargetBucket': tenant_name+':'+log_bucket_name, 'TargetPrefix': prefix}
     if log_type == 'Journal':
         logging_enabled['LoggingType'] = 'Journal'
     response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={
@@ -15103,7 +15164,7 @@ def _put_bucket_logging_tenant(log_type):
     })
     assert response['ResponseMetadata']['HTTPStatusCode'] == 200
     _bucket_logging_tenant_objects(client, src_bucket_name, client, log_bucket_name, log_type, 'REST.PUT.OBJECT')
-    
+
     # src and log are on the same tenant
     # log bucket name is set without tenant
     client = get_tenant_client()
@@ -15111,7 +15172,9 @@ def _put_bucket_logging_tenant(log_type):
     src_bucket = get_new_bucket(client=client, name=src_bucket_name)
     log_bucket_name = get_new_bucket_name()
     log_bucket = get_new_bucket(client=client, name=log_bucket_name)
-    logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': 'log/'}
+    tenant_name = get_tenant_name()
+    _set_log_bucket_policy_tenant(client, tenant_name, log_bucket_name, tenant_name, get_tenant_user_id(), [src_bucket_name], [prefix])
+    logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': prefix}
     if log_type == 'Journal':
         logging_enabled['LoggingType'] = 'Journal'
     response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={
@@ -15119,7 +15182,7 @@ def _put_bucket_logging_tenant(log_type):
     })
     assert response['ResponseMetadata']['HTTPStatusCode'] == 200
     _bucket_logging_tenant_objects(client, src_bucket_name, client, log_bucket_name, log_type, 'REST.PUT.OBJECT')
-    
+
     # src is on tenant and log is on the default tenant
     # log bucket name is set with explicit default tenant
     client = get_tenant_client()
@@ -15128,6 +15191,7 @@ def _put_bucket_logging_tenant(log_type):
     log_client = get_client()
     log_bucket_name = get_new_bucket_name()
     log_bucket = get_new_bucket(client=log_client, name=log_bucket_name)
+    _set_log_bucket_policy_tenant(log_client, "", log_bucket_name, get_tenant_name(), get_tenant_user_id(), [src_bucket_name], [prefix])
     logging_enabled = {'TargetBucket': ':'+log_bucket_name, 'TargetPrefix': 'log/'}
     if log_type == 'Journal':
         logging_enabled['LoggingType'] = 'Journal'
@@ -15136,14 +15200,16 @@ def _put_bucket_logging_tenant(log_type):
     })
     assert response['ResponseMetadata']['HTTPStatusCode'] == 200
     _bucket_logging_tenant_objects(client, src_bucket_name, log_client, log_bucket_name, log_type, 'REST.PUT.OBJECT')
-    
+
     try:
         # src is on tenant and log is on the default tenant
         client = get_tenant_client()
         src_bucket_name = get_new_bucket_name()
         src_bucket = get_new_bucket(client=client, name=src_bucket_name)
         log_bucket_name = get_new_bucket_name()
-        log_bucket = get_new_bucket_resource(name=log_bucket_name)
+        log_client = get_client()
+        log_bucket = get_new_bucket(client=log_client, name=log_bucket_name)
+        _set_log_bucket_policy_tenant(log_client, "", log_bucket_name, get_tenant_name(), get_tenant_user_id(), [src_bucket_name], [prefix])
         logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': 'log/'}
         if log_type == 'Journal':
             logging_enabled['LoggingType'] = 'Journal'
@@ -15177,7 +15243,9 @@ def test_rm_bucket_logging():
     log_bucket_name = get_new_bucket_name()
     log_bucket = get_new_bucket_resource(name=log_bucket_name)
     client = get_client()
-    logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': 'log/'}
+    prefix = 'log/'
+    _set_log_bucket_policy(client, log_bucket_name, [src_bucket_name], [prefix])
+    logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': prefix}
     response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={
         'LoggingEnabled': logging_enabled,
     })
@@ -15185,7 +15253,7 @@ def test_rm_bucket_logging():
 
     response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={})
     assert response['ResponseMetadata']['HTTPStatusCode'] == 200
-    
+
     response = client.get_bucket_logging(Bucket=src_bucket_name)
     assert response['ResponseMetadata']['HTTPStatusCode'] == 200
     assert not 'LoggingEnabled' in response
@@ -15201,8 +15269,10 @@ def test_put_bucket_logging_extensions():
     log_bucket_name = get_new_bucket_name()
     log_bucket = get_new_bucket_resource(name=log_bucket_name)
     client = get_client()
-    logging_enabled = {'TargetBucket': log_bucket_name, 
-                       'TargetPrefix': 'log/', 
+    prefix = 'log/'
+    _set_log_bucket_policy(client, log_bucket_name, [src_bucket_name], [prefix])
+    logging_enabled = {'TargetBucket': log_bucket_name,
+                       'TargetPrefix': prefix,
                        'LoggingType': 'Standard',
                        'ObjectRollTime': expected_object_roll_time,
                        'RecordsBatchSize': 0
@@ -15211,7 +15281,7 @@ def test_put_bucket_logging_extensions():
         'LoggingEnabled': logging_enabled,
     })
     assert response['ResponseMetadata']['HTTPStatusCode'] == 200
-    
+
     response = client.get_bucket_logging(Bucket=src_bucket_name)
     assert response['ResponseMetadata']['HTTPStatusCode'] == 200
 
@@ -15227,9 +15297,11 @@ def _bucket_logging_put_objects(versioned):
     log_bucket = get_new_bucket_resource(name=log_bucket_name)
     client = get_client()
     has_extensions = _has_bucket_logging_extension()
-    
+    prefix = 'log/'
+    _set_log_bucket_policy(client, log_bucket_name, [src_bucket_name], [prefix])
+
     # minimal configuration
-    logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': 'log/'}
+    logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': prefix}
     if has_extensions:
         logging_enabled['ObjectRollTime'] = expected_object_roll_time
         logging_enabled['LoggingType'] = 'Journal'
@@ -15240,7 +15312,7 @@ def _bucket_logging_put_objects(versioned):
 
     num_keys = 5
     for j in range(num_keys):
-        name = 'myobject'+str(j)    
+        name = 'myobject'+str(j)
         client.put_object(Bucket=src_bucket_name, Key=name, Body=randcontent())
         if versioned:
             client.put_object(Bucket=src_bucket_name, Key=name, Body=randcontent())
@@ -15260,7 +15332,7 @@ def _bucket_logging_put_objects(versioned):
     assert len(keys) == 1
 
     record_type = 'Standard' if not has_extensions else 'Journal'
-    
+
     for key in keys:
         assert key.startswith('log/')
         response = client.get_object(Bucket=log_bucket_name, Key=key)
@@ -15285,9 +15357,11 @@ def test_bucket_logging_put_concurrency():
     log_bucket = get_new_bucket_resource(name=log_bucket_name)
     client = get_client(client_config=botocore.config.Config(max_pool_connections=50))
     has_extensions = _has_bucket_logging_extension()
-    
+
+    prefix = 'log/'
+    _set_log_bucket_policy(client, log_bucket_name, [src_bucket_name], [prefix])
     # minimal configuration
-    logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': 'log/'}
+    logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': prefix}
     if has_extensions:
         logging_enabled['ObjectRollTime'] = expected_object_roll_time
         logging_enabled['LoggingType'] = 'Journal'
@@ -15299,8 +15373,8 @@ def test_bucket_logging_put_concurrency():
     num_keys = 50
     t = []
     for i in range(num_keys):
-        name = 'myobject'+str(i)    
-        thr = threading.Thread(target = client.put_object, 
+        name = 'myobject'+str(i)
+        thr = threading.Thread(target = client.put_object,
                                kwargs={'Bucket': src_bucket_name, 'Key': name, 'Body': randcontent()})
         thr.start()
         t.append(thr)
@@ -15327,7 +15401,7 @@ def test_bucket_logging_put_concurrency():
     assert len(keys) == 1
 
     record_type = 'Standard' if not has_extensions else 'Journal'
-    
+
     for key in keys:
         assert key.startswith('log/')
         response = client.get_object(Bucket=log_bucket_name, Key=key)
@@ -15346,13 +15420,15 @@ def _bucket_logging_delete_objects(versioned):
 
     num_keys = 5
     for j in range(num_keys):
-        name = 'myobject'+str(j)    
+        name = 'myobject'+str(j)
         client.put_object(Bucket=src_bucket_name, Key=name, Body=randcontent())
         if versioned:
             client.put_object(Bucket=src_bucket_name, Key=name, Body=randcontent())
 
+    prefix = 'log/'
+    _set_log_bucket_policy(client, log_bucket_name, [src_bucket_name], [prefix])
     # minimal configuration
-    logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': 'log/'}
+    logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': prefix}
     if has_extensions:
         logging_enabled['ObjectRollTime'] = expected_object_roll_time
         logging_enabled['LoggingType'] = 'Journal'
@@ -15374,7 +15450,7 @@ def _bucket_logging_delete_objects(versioned):
     response = client.list_objects_v2(Bucket=log_bucket_name)
     keys = _get_keys(response)
     assert len(keys) == 1
-    
+
     if versioned:
         expected_count = 2*num_keys
     else:
@@ -15410,13 +15486,15 @@ def _bucket_logging_get_objects(versioned):
 
     num_keys = 5
     for j in range(num_keys):
-        name = 'myobject'+str(j)    
+        name = 'myobject'+str(j)
         client.put_object(Bucket=src_bucket_name, Key=name, Body=randcontent())
         if versioned:
             client.put_object(Bucket=src_bucket_name, Key=name, Body=randcontent())
 
+    prefix = 'log/'
+    _set_log_bucket_policy(client, log_bucket_name, [src_bucket_name], [prefix])
     # minimal configuration
-    logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': 'log/'}
+    logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': prefix}
     if has_extensions:
         logging_enabled['ObjectRollTime'] = expected_object_roll_time
         logging_enabled['LoggingType'] = 'Standard'
@@ -15438,7 +15516,7 @@ def _bucket_logging_get_objects(versioned):
     response = client.list_objects_v2(Bucket=log_bucket_name)
     keys = _get_keys(response)
     assert len(keys) == 1
-    
+
     if versioned:
         expected_count = 2*num_keys
     else:
@@ -15454,7 +15532,7 @@ def _bucket_logging_get_objects(versioned):
 @pytest.mark.bucket_logging
 def test_bucket_logging_get_objects():
     _bucket_logging_get_objects(False)
-    
+
 
 @pytest.mark.bucket_logging
 def test_bucket_logging_get_objects_versioned():
@@ -15477,13 +15555,15 @@ def _bucket_logging_copy_objects(versioned, another_bucket):
 
     num_keys = 5
     for j in range(num_keys):
-        name = 'myobject'+str(j)    
+        name = 'myobject'+str(j)
         client.put_object(Bucket=src_bucket_name, Key=name, Body=randcontent())
         if versioned:
             client.put_object(Bucket=src_bucket_name, Key=name, Body=randcontent())
 
+    prefix = 'log/'
+    _set_log_bucket_policy(client, log_bucket_name, [src_bucket_name, dst_bucket_name], [prefix])
     # minimal configuration
-    logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': 'log/'}
+    logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': prefix}
     if has_extensions:
         logging_enabled['ObjectRollTime'] = expected_object_roll_time
         logging_enabled['LoggingType'] = 'Journal'
@@ -15512,7 +15592,7 @@ def _bucket_logging_copy_objects(versioned, another_bucket):
     response = client.list_objects_v2(Bucket=log_bucket_name)
     keys = _get_keys(response)
     assert len(keys) == 1
-    
+
     key = keys[0]
     assert key.startswith('log/')
     response = client.get_object(Bucket=log_bucket_name, Key=key)
@@ -15550,13 +15630,15 @@ def _bucket_logging_head_objects(versioned):
     log_bucket = get_new_bucket_resource(name=log_bucket_name)
     client = get_client()
     has_extensions = _has_bucket_logging_extension()
-    
+
     num_keys = 5
     for j in range(num_keys):
-        name = 'myobject'+str(j)    
+        name = 'myobject'+str(j)
         client.put_object(Bucket=src_bucket_name, Key=name, Body=randcontent())
 
-    logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': 'log/'}
+    prefix = 'log/'
+    _set_log_bucket_policy(client, log_bucket_name, [src_bucket_name], [prefix])
+    logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': prefix}
     if has_extensions:
         logging_enabled['ObjectRollTime'] = expected_object_roll_time
         logging_enabled['LoggingType'] = 'Standard'
@@ -15579,12 +15661,12 @@ def _bucket_logging_head_objects(versioned):
     response = client.list_objects_v2(Bucket=log_bucket_name)
     keys = _get_keys(response)
     assert len(keys) == 1
-    
+
     if versioned:
         expected_count = 2*num_keys
     else:
         expected_count = num_keys
-    
+
     key = keys[0]
     assert key.startswith('log/')
     response = client.get_object(Bucket=log_bucket_name, Key=key)
@@ -15612,8 +15694,10 @@ def _bucket_logging_mpu(versioned):
     client = get_client()
     has_extensions = _has_bucket_logging_extension()
 
+    prefix = 'log/'
+    _set_log_bucket_policy(client, log_bucket_name, [src_bucket_name], [prefix])
     # minimal configuration
-    logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': 'log/'}
+    logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': prefix}
     if has_extensions:
         logging_enabled['ObjectRollTime'] = expected_object_roll_time
         logging_enabled['LoggingType'] = 'Journal'
@@ -15621,7 +15705,7 @@ def _bucket_logging_mpu(versioned):
         'LoggingEnabled': logging_enabled,
     })
     assert response['ResponseMetadata']['HTTPStatusCode'] == 200
-    
+
     src_key = "myobject"
     objlen = 30 * 1024 * 1024
     (upload_id, data, parts) = _multipart_upload(bucket_name=src_bucket_name, key=src_key, size=objlen)
@@ -15629,13 +15713,13 @@ def _bucket_logging_mpu(versioned):
     if versioned:
         (upload_id, data, parts) = _multipart_upload(bucket_name=src_bucket_name, key=src_key, size=objlen)
         client.complete_multipart_upload(Bucket=src_bucket_name, Key=src_key, UploadId=upload_id, MultipartUpload={'Parts': parts})
-    
+
     _flush_logs(client, src_bucket_name)
 
     response = client.list_objects_v2(Bucket=log_bucket_name)
     keys = _get_keys(response)
     assert len(keys) == 1
-    
+
     if versioned:
         expected_count = 4 if not has_extensions else 2
     else:
@@ -15677,8 +15761,10 @@ def _bucket_logging_mpu_copy(versioned):
         (upload_id, data, parts) = _multipart_upload(bucket_name=src_bucket_name, key=src_key, size=objlen)
         client.complete_multipart_upload(Bucket=src_bucket_name, Key=src_key, UploadId=upload_id, MultipartUpload={'Parts': parts})
 
+    prefix = 'log/'
+    _set_log_bucket_policy(client, log_bucket_name, [src_bucket_name], [prefix])
     # minimal configuration
-    logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': 'log/'}
+    logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': prefix}
     if has_extensions:
         logging_enabled['ObjectRollTime'] = expected_object_roll_time
         logging_enabled['LoggingType'] = 'Journal'
@@ -15694,7 +15780,7 @@ def _bucket_logging_mpu_copy(versioned):
     response = client.list_objects_v2(Bucket=log_bucket_name)
     keys = _get_keys(response)
     assert len(keys) == 1
-    
+
     key = keys[0]
     assert key.startswith('log/')
     response = client.get_object(Bucket=log_bucket_name, Key=key)
@@ -15724,13 +15810,15 @@ def _bucket_logging_multi_delete(versioned):
 
     num_keys = 5
     for j in range(num_keys):
-        name = 'myobject'+str(j)    
+        name = 'myobject'+str(j)
         client.put_object(Bucket=src_bucket_name, Key=name, Body=randcontent())
         if versioned:
             client.put_object(Bucket=src_bucket_name, Key=name, Body=randcontent())
 
+    prefix = 'log/'
+    _set_log_bucket_policy(client, log_bucket_name, [src_bucket_name], [prefix])
     # minimal configuration
-    logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': 'log/'}
+    logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': prefix}
     if has_extensions:
         logging_enabled['ObjectRollTime'] = expected_object_roll_time
         logging_enabled['LoggingType'] = 'Journal'
@@ -15758,7 +15846,7 @@ def _bucket_logging_multi_delete(versioned):
     response = client.list_objects_v2(Bucket=log_bucket_name)
     keys = _get_keys(response)
     assert len(keys) == 1
-    
+
     if versioned:
         expected_count = 2*num_keys
     else:
@@ -15788,9 +15876,11 @@ def _bucket_logging_type(logging_type):
     log_bucket_name = get_new_bucket_name()
     log_bucket = get_new_bucket_resource(name=log_bucket_name)
     client = get_client()
+    prefix = 'log/'
+    _set_log_bucket_policy(client, log_bucket_name, [src_bucket_name], [prefix])
     logging_enabled = {
-            'TargetBucket': log_bucket_name, 
-            'TargetPrefix': 'log/',
+            'TargetBucket': log_bucket_name,
+            'TargetPrefix': prefix,
             'ObjectRollTime': expected_object_roll_time,
             'LoggingType': logging_type
             }
@@ -15799,19 +15889,19 @@ def _bucket_logging_type(logging_type):
     })
     num_keys = 5
     for j in range(num_keys):
-        name = 'myobject'+str(j)    
+        name = 'myobject'+str(j)
         client.put_object(Bucket=src_bucket_name, Key=name, Body=randcontent())
         client.head_object(Bucket=src_bucket_name, Key=name)
-    
+
     response = client.list_objects_v2(Bucket=src_bucket_name)
     src_keys = _get_keys(response)
-    
+
     _flush_logs(client, src_bucket_name)
-    
+
     response = client.list_objects_v2(Bucket=log_bucket_name)
     keys = _get_keys(response)
     assert len(keys) == 1
-    
+
     key = keys[0]
     assert key.startswith('log/')
     response = client.get_object(Bucket=log_bucket_name, Key=key)
@@ -15852,9 +15942,11 @@ def test_bucket_logging_roll_time():
     log_bucket_name = get_new_bucket_name()
     log_bucket = get_new_bucket_resource(name=log_bucket_name)
     client = get_client()
-   
+
+    prefix = 'log/'
+    _set_log_bucket_policy(client, log_bucket_name, [src_bucket_name], [prefix])
     roll_time = 10
-    logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': 'log/', 'ObjectRollTime': roll_time}
+    logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': prefix, 'ObjectRollTime': roll_time}
     response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={
         'LoggingEnabled': logging_enabled,
     })
@@ -15862,7 +15954,7 @@ def test_bucket_logging_roll_time():
 
     num_keys = 5
     for j in range(num_keys):
-        name = 'myobject'+str(j)    
+        name = 'myobject'+str(j)
         client.put_object(Bucket=src_bucket_name, Key=name, Body=randcontent())
 
     response = client.list_objects_v2(Bucket=src_bucket_name)
@@ -15881,30 +15973,30 @@ def test_bucket_logging_roll_time():
     response = client.list_objects_v2(Bucket=log_bucket_name)
     keys = _get_keys(response)
     len(keys) == 1
-    
+
     key = keys[0]
     assert key.startswith('log/')
     response = client.get_object(Bucket=log_bucket_name, Key=key)
     body = _get_body(response)
     assert _verify_records(body, src_bucket_name, 'REST.PUT.OBJECT', src_keys, 'Standard', num_keys)
     client.delete_object(Bucket=log_bucket_name, Key=key)
-    
+
     num_keys = 25
     for j in range(num_keys):
-        name = 'myobject'+str(j)    
+        name = 'myobject'+str(j)
         client.put_object(Bucket=src_bucket_name, Key=name, Body=randcontent())
         time.sleep(1)
-    
+
     response = client.list_objects_v2(Bucket=src_bucket_name)
     src_keys = _get_keys(response)
-    
+
     time.sleep(roll_time)
     client.put_object(Bucket=src_bucket_name, Key='myobject', Body=randcontent())
 
     response = client.list_objects_v2(Bucket=log_bucket_name)
     keys = _get_keys(response)
     assert len(keys) > 1
-   
+
     body = ''
     for key in keys:
         assert key.startswith('log/')
@@ -15919,26 +16011,32 @@ def test_bucket_logging_multiple_prefixes():
     log_bucket = get_new_bucket_resource(name=log_bucket_name)
     client = get_client()
     has_extensions = _has_bucket_logging_extension()
-   
+
     num_buckets = 5
+    log_prefixes = []
     buckets = []
     bucket_name_prefix = get_new_bucket_name()
     for j in range(num_buckets):
         src_bucket_name = bucket_name_prefix+str(j)
         src_bucket = get_new_bucket_resource(name=src_bucket_name)
-        logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': src_bucket_name+'/'}
+        log_prefixes.append(src_bucket_name+'/')
+        buckets.append(src_bucket_name)
+
+    _set_log_bucket_policy(client, log_bucket_name, buckets, log_prefixes)
+
+    for j in range(num_buckets):
+        logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': log_prefixes[j]}
         if has_extensions:
             logging_enabled['ObjectRollTime'] = expected_object_roll_time
-        response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={
+        response = client.put_bucket_logging(Bucket=buckets[j], BucketLoggingStatus={
             'LoggingEnabled': logging_enabled,
         })
         assert response['ResponseMetadata']['HTTPStatusCode'] == 200
-        buckets.append(src_bucket_name)
 
     num_keys = 5
     for src_bucket_name in buckets:
         for j in range(num_keys):
-            name = 'myobject'+str(j)    
+            name = 'myobject'+str(j)
             client.put_object(Bucket=src_bucket_name, Key=name, Body=randcontent())
 
     for src_bucket_name in buckets:
@@ -15947,7 +16045,7 @@ def test_bucket_logging_multiple_prefixes():
     response = client.list_objects_v2(Bucket=log_bucket_name)
     keys = _get_keys(response)
     assert len(keys) >= num_buckets
-    
+
     for key in keys:
         response = client.get_object(Bucket=log_bucket_name, Key=key)
         body = _get_body(response)
@@ -15967,13 +16065,19 @@ def test_bucket_logging_single_prefix():
     log_bucket = get_new_bucket_resource(name=log_bucket_name)
     client = get_client()
     has_extensions = _has_bucket_logging_extension()
-   
+
     num_buckets = 5
     buckets = []
+    prefix = 'log/'
     bucket_name_prefix = get_new_bucket_name()
     for j in range(num_buckets):
         src_bucket_name = bucket_name_prefix+str(j)
         src_bucket = get_new_bucket_resource(name=src_bucket_name)
+        buckets.append(src_bucket_name)
+
+    _set_log_bucket_policy(client, log_bucket_name, buckets, [prefix])
+
+    for j in range(num_buckets):
         # minimal configuration
         logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': 'log/'}
         if has_extensions:
@@ -15982,14 +16086,13 @@ def test_bucket_logging_single_prefix():
             'LoggingEnabled': logging_enabled,
         })
         assert response['ResponseMetadata']['HTTPStatusCode'] == 200
-        buckets.append(src_bucket_name)
 
     num_keys = 5
     bucket_ind = 0
     for src_bucket_name in buckets:
         bucket_ind += 1
         for j in range(num_keys):
-            name = 'myobject'+str(bucket_ind)+str(j)    
+            name = 'myobject'+str(bucket_ind)+str(j)
             client.put_object(Bucket=src_bucket_name, Key=name, Body=randcontent())
 
     _flush_logs(client, src_bucket_name)
@@ -15997,7 +16100,7 @@ def test_bucket_logging_single_prefix():
     response = client.list_objects_v2(Bucket=log_bucket_name)
     keys = _get_keys(response)
     assert len(keys) == 1
-    
+
     key = keys[0]
     response = client.get_object(Bucket=log_bucket_name, Key=key)
     body = _get_body(response)
@@ -16017,22 +16120,28 @@ def _bucket_logging_cleanup(cleanup_type, logging_type, single_prefix, concurren
 
     num_buckets = 5
     buckets = []
+    log_prefixes = []
     longer_time = expected_object_roll_time*10
     for j in range(num_buckets):
         src_bucket_name = get_new_bucket_name()
         src_bucket = get_new_bucket_resource(name=src_bucket_name)
-        logging_enabled = {'TargetBucket': log_bucket_name,
-                           'ObjectRollTime': longer_time, 'LoggingType': logging_type}
+        buckets.append(src_bucket_name)
         if single_prefix:
-            logging_enabled['TargetPrefix'] = 'log/'
+            log_prefixes.append('log/')
         else:
-            logging_enabled['TargetPrefix'] = src_bucket_name+'/'
+            log_prefixes.append(src_bucket_name+'/')
 
-        response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={
+    _set_log_bucket_policy(client, log_bucket_name, buckets, log_prefixes)
+
+    for j in range(num_buckets):
+        logging_enabled = {'TargetBucket': log_bucket_name,
+                           'ObjectRollTime': longer_time,
+                           'LoggingType': logging_type,
+                           'TargetPrefix': log_prefixes[j]}
+        response = client.put_bucket_logging(Bucket=buckets[j], BucketLoggingStatus={
             'LoggingEnabled': logging_enabled,
         })
         assert response['ResponseMetadata']['HTTPStatusCode'] == 200
-        buckets.append(src_bucket_name)
 
     num_keys = 10
     src_names = []
@@ -16099,6 +16208,7 @@ def _bucket_logging_cleanup(cleanup_type, logging_type, single_prefix, concurren
         # no concurrecy testing
         client.delete_bucket(Bucket=log_bucket_name)
         log_bucket = get_new_bucket_resource(name=log_bucket_name)
+        _set_log_bucket_policy(client, log_bucket_name, buckets, log_prefixes)
         old_names = src_names
         src_names = []
         for j in range(num_keys):