policy['Grants'] = old_grants
client.put_bucket_acl(Bucket=bucket_name, AccessControlPolicy=policy)
-# TODO rgw log_bucket.set_as_logging_target() gives 403 Forbidden
-# http://tracker.newdream.net/issues/984
-@pytest.mark.fails_on_rgw
-def test_logging_toggle():
- bucket_name = get_new_bucket()
- client = get_client()
-
- main_display_name = get_main_display_name()
- main_user_id = get_main_user_id()
-
- status = {'LoggingEnabled': {'TargetBucket': bucket_name, 'TargetGrants': [{'Grantee': {'DisplayName': main_display_name, 'ID': main_user_id,'Type': 'CanonicalUser'},'Permission': 'FULL_CONTROL'}], 'TargetPrefix': 'foologgingprefix'}}
-
- client.put_bucket_logging(Bucket=bucket_name, BucketLoggingStatus=status)
- client.get_bucket_logging(Bucket=bucket_name)
- status = {'LoggingEnabled': {}}
- client.put_bucket_logging(Bucket=bucket_name, BucketLoggingStatus=status)
- # NOTE: this does not actually test whether or not logging works
-
def _setup_access(bucket_acl, object_acl):
"""
Simple test fixture: create a bucket with given ACL, with objects:
r = requests.post(url, files=payload, verify=get_config_ssl_verify())
assert r.status_code == 400
+
+
+def _has_bucket_logging_extension():
+ src_bucket_name = get_new_bucket_name()
+ src_bucket = get_new_bucket_resource(name=src_bucket_name)
+ log_bucket_name = get_new_bucket_name()
+ log_bucket = get_new_bucket_resource(name=log_bucket_name)
+ client = get_client()
+ logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': 'log/', 'LoggingType': 'Journal'}
+ try:
+ response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={
+ 'LoggingEnabled': logging_enabled,
+ })
+ except ParamValidationError as e:
+ return False
+ return True
+
+def _has_taget_object_key_format():
+ src_bucket_name = get_new_bucket_name()
+ src_bucket = get_new_bucket_resource(name=src_bucket_name)
+ log_bucket_name = get_new_bucket_name()
+ log_bucket = get_new_bucket_resource(name=log_bucket_name)
+ client = get_client()
+ logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': 'log/', 'TargetObjectKeyFormat': {'SimplePrefix': {}}}
+ try:
+ response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={
+ 'LoggingEnabled': logging_enabled,
+ })
+ except ParamValidationError as e:
+ return False
+ return True
+
+
+import shlex
+
+def _parse_standard_log_record(record):
+ record = record.replace('[', '"').replace(']', '"')
+ chunks = shlex.split(record)
+ assert len(chunks) == 26
+ return json.dumps({
+ 'BucketOwner': chunks[0],
+ 'BucketName': chunks[1],
+ 'RequestDateTime': chunks[2],
+ 'RemoteIP': chunks[3],
+ 'Requester': chunks[4],
+ 'RequestID': chunks[5],
+ 'Operation': chunks[6],
+ 'Key': chunks[7],
+ 'RequestURI': chunks[8],
+ 'HTTPStatus': chunks[9],
+ 'ErrorCode': chunks[10],
+ 'BytesSent': chunks[11],
+ 'ObjectSize': chunks[12],
+ 'TotalTime': chunks[13],
+ 'TurnAroundTime': chunks[14],
+ 'Referrer': chunks[15],
+ 'UserAgent': chunks[16],
+ 'VersionID': chunks[17],
+ 'HostID': chunks[18],
+ 'SigVersion': chunks[19],
+ 'CipherSuite': chunks[20],
+ 'AuthType': chunks[21],
+ 'HostHeader': chunks[22],
+ 'TLSVersion': chunks[23],
+ 'AccessPointARN': chunks[24],
+ 'ACLRequired': chunks[25],
+ }, indent=4)
+
+
+def _parse_journal_log_record(record):
+ record = record.replace('[', '"').replace(']', '"')
+ chunks = shlex.split(record)
+ assert len(chunks) == 8
+ return json.dumps({
+ 'BucketOwner': chunks[0],
+ 'BucketName': chunks[1],
+ 'RequestDateTime': chunks[2],
+ 'Operation': chunks[3],
+ 'Key': chunks[4],
+ 'ObjectSize': chunks[5],
+ 'VersionID': chunks[6],
+ 'ETAG': chunks[7],
+ }, indent=4)
+
+def _parse_log_record(record, record_type):
+ if record_type == 'Standard':
+ return _parse_standard_log_record(record)
+ elif record_type == 'Journal':
+ return _parse_journal_log_record(record)
+ else:
+ assert False, 'unknown log record type'
+
+expected_object_roll_time = 5
+
+@pytest.mark.bucket_logging
+def test_put_bucket_logging():
+ src_bucket_name = get_new_bucket_name()
+ src_bucket = get_new_bucket_resource(name=src_bucket_name)
+ log_bucket_name = get_new_bucket_name()
+ log_bucket = get_new_bucket_resource(name=log_bucket_name)
+ client = get_client()
+ has_extensions = _has_bucket_logging_extension()
+ has_key_format = _has_taget_object_key_format()
+
+ # minimal configuration
+ logging_enabled = {
+ 'TargetBucket': log_bucket_name,
+ 'TargetPrefix': 'log/'
+ }
+
+ if has_extensions:
+ logging_enabled['ObjectRollTime'] = expected_object_roll_time
+ response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={
+ 'LoggingEnabled': logging_enabled,
+ })
+ assert response['ResponseMetadata']['HTTPStatusCode'] == 200
+ response = client.get_bucket_logging(Bucket=src_bucket_name)
+ assert response['ResponseMetadata']['HTTPStatusCode'] == 200
+ if has_extensions:
+ logging_enabled['LoggingType'] = 'Standard'
+ logging_enabled['RecordsBatchSize'] = 0
+ if has_key_format:
+ # default value for key prefix is returned
+ logging_enabled['TargetObjectKeyFormat'] = {'SimplePrefix': {}}
+ assert response['LoggingEnabled'] == logging_enabled
+
+ if has_key_format:
+ # with simple target object prefix
+ logging_enabled = {
+ 'TargetBucket': log_bucket_name,
+ 'TargetPrefix': 'log/',
+ 'TargetObjectKeyFormat': {
+ 'SimplePrefix': {}
+ }
+ }
+ if has_extensions:
+ logging_enabled['ObjectRollTime'] = expected_object_roll_time
+ response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={
+ 'LoggingEnabled': logging_enabled,
+ })
+ assert response['ResponseMetadata']['HTTPStatusCode'] == 200
+ response = client.get_bucket_logging(Bucket=src_bucket_name)
+ assert response['ResponseMetadata']['HTTPStatusCode'] == 200
+ if has_extensions:
+ logging_enabled['LoggingType'] = 'Standard'
+ logging_enabled['RecordsBatchSize'] = 0
+ assert response['LoggingEnabled'] == logging_enabled
+
+ # with partitioned target object prefix
+ logging_enabled = {
+ 'TargetBucket': log_bucket_name,
+ 'TargetPrefix': 'log/',
+ 'TargetObjectKeyFormat': {
+ 'PartitionedPrefix': {
+ 'PartitionDateSource': 'DeliveryTime'
+ }
+ }
+ }
+ if has_extensions:
+ logging_enabled['ObjectRollTime'] = expected_object_roll_time
+ response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={
+ 'LoggingEnabled': logging_enabled,
+ })
+ assert response['ResponseMetadata']['HTTPStatusCode'] == 200
+ response = client.get_bucket_logging(Bucket=src_bucket_name)
+ assert response['ResponseMetadata']['HTTPStatusCode'] == 200
+ if has_extensions:
+ logging_enabled['LoggingType'] = 'Standard'
+ logging_enabled['RecordsBatchSize'] = 0
+ assert response['LoggingEnabled'] == logging_enabled
+
+ # with target grant (not implemented in RGW)
+ main_display_name = get_main_display_name()
+ main_user_id = get_main_user_id()
+ logging_enabled = {
+ 'TargetBucket': log_bucket_name,
+ 'TargetPrefix': 'log/',
+ 'TargetGrants': [{'Grantee': {'DisplayName': main_display_name, 'ID': main_user_id,'Type': 'CanonicalUser'},'Permission': 'FULL_CONTROL'}]
+ }
+ if has_extensions:
+ logging_enabled['ObjectRollTime'] = expected_object_roll_time
+ response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={
+ 'LoggingEnabled': logging_enabled,
+ })
+ assert response['ResponseMetadata']['HTTPStatusCode'] == 200
+ response = client.get_bucket_logging(Bucket=src_bucket_name)
+ assert response['ResponseMetadata']['HTTPStatusCode'] == 200
+ if has_extensions:
+ logging_enabled['LoggingType'] = 'Standard'
+ logging_enabled['RecordsBatchSize'] = 0
+ # target grants are not implemented
+ logging_enabled.pop('TargetGrants')
+ if has_key_format:
+ # default value for key prefix is returned
+ logging_enabled['TargetObjectKeyFormat'] = {'SimplePrefix': {}}
+ assert response['LoggingEnabled'] == logging_enabled
+
+
+@pytest.mark.bucket_logging
+def test_put_bucket_logging_errors():
+ src_bucket_name = get_new_bucket_name()
+ src_bucket = get_new_bucket_resource(name=src_bucket_name)
+ log_bucket_name1 = get_new_bucket_name()
+ log_bucket1 = get_new_bucket_resource(name=log_bucket_name1)
+ client = get_client()
+
+ # invalid source bucket
+ try:
+ response = client.put_bucket_logging(Bucket=src_bucket_name+'kaboom', BucketLoggingStatus={
+ 'LoggingEnabled': {'TargetBucket': log_bucket_name1, 'TargetPrefix': 'log/'},
+ })
+ assert False, 'expected failure'
+ except ClientError as e:
+ assert e.response['Error']['Code'] == 'NoSuchBucket'
+
+ # invalid log bucket
+ try:
+ response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={
+ 'LoggingEnabled': {'TargetBucket': log_bucket_name1+'kaboom', 'TargetPrefix': 'log/'},
+ })
+ assert False, 'expected failure'
+ except ClientError as e:
+ assert e.response['Error']['Code'] == 'NoSuchKey'
+
+ # log bucket has bucket logging
+ log_bucket_name2 = get_new_bucket_name()
+ log_bucket2 = get_new_bucket_resource(name=log_bucket_name2)
+ response = client.put_bucket_logging(Bucket=log_bucket_name2, BucketLoggingStatus={
+ 'LoggingEnabled': {'TargetBucket': log_bucket_name1, 'TargetPrefix': 'log/'},
+ })
+ assert response['ResponseMetadata']['HTTPStatusCode'] == 200
+ try:
+ response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={
+ 'LoggingEnabled': {'TargetBucket': log_bucket_name2, 'TargetPrefix': 'log/'},
+ })
+ assert False, 'expected failure'
+ except ClientError as e:
+ assert e.response['Error']['Code'] == 'InvalidArgument'
+
+ if _has_taget_object_key_format():
+ # invalid partition prefix
+ logging_enabled = {
+ 'TargetBucket': log_bucket_name1,
+ 'TargetPrefix': 'log/',
+ 'TargetObjectKeyFormat': {
+ 'PartitionedPrefix': {
+ 'PartitionDateSource': 'kaboom'
+ }
+ }
+ }
+ try:
+ response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={
+ 'LoggingEnabled': logging_enabled,
+ })
+ assert False, 'expected failure'
+ except ClientError as e:
+ assert e.response['Error']['Code'] == 'MalformedXML'
+
+ # TODO: log bucket is encrypted
+ #_put_bucket_encryption_s3(client, log_bucket_name)
+ #try:
+ # response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={
+ # 'LoggingEnabled': {'TargetBucket': log_bucket_name, 'TargetPrefix': 'log/'},
+ # })
+ # assert False, 'expected failure'
+ #except ClientError as e:
+ # assert e.response['Error']['Code'] == 'InvalidArgument'
+
+ if _has_bucket_logging_extension():
+ try:
+ response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={
+ 'LoggingEnabled': {'TargetBucket': log_bucket_name1, 'TargetPrefix': 'log/', 'LoggingType': 'kaboom'},
+ })
+ assert False, 'expected failure'
+ except ClientError as e:
+ assert e.response['Error']['Code'] == 'MalformedXML'
+
+
+@pytest.mark.bucket_logging
+def test_rm_bucket_logging():
+ src_bucket_name = get_new_bucket_name()
+ src_bucket = get_new_bucket_resource(name=src_bucket_name)
+ log_bucket_name = get_new_bucket_name()
+ log_bucket = get_new_bucket_resource(name=log_bucket_name)
+ client = get_client()
+ logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': 'log/'}
+ response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={
+ 'LoggingEnabled': logging_enabled,
+ })
+ assert response['ResponseMetadata']['HTTPStatusCode'] == 200
+
+ response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={})
+ assert response['ResponseMetadata']['HTTPStatusCode'] == 200
+
+ response = client.get_bucket_logging(Bucket=src_bucket_name)
+ assert response['ResponseMetadata']['HTTPStatusCode'] == 200
+ assert not 'LoggingEnabled' in response
+
+
+@pytest.mark.bucket_logging
+@pytest.mark.fails_on_aws
+def test_put_bucket_logging_extensions():
+ if not _has_bucket_logging_extension():
+ pytest.skip('ceph extension to bucket logging not supported at client')
+ src_bucket_name = get_new_bucket_name()
+ src_bucket = get_new_bucket_resource(name=src_bucket_name)
+ log_bucket_name = get_new_bucket_name()
+ log_bucket = get_new_bucket_resource(name=log_bucket_name)
+ client = get_client()
+ logging_enabled = {'TargetBucket': log_bucket_name,
+ 'TargetPrefix': 'log/',
+ 'LoggingType': 'Standard',
+ 'ObjectRollTime': expected_object_roll_time,
+ 'RecordsBatchSize': 0
+ }
+ response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={
+ 'LoggingEnabled': logging_enabled,
+ })
+ assert response['ResponseMetadata']['HTTPStatusCode'] == 200
+
+ response = client.get_bucket_logging(Bucket=src_bucket_name)
+ assert response['ResponseMetadata']['HTTPStatusCode'] == 200
+
+ logging_enabled['TargetObjectKeyFormat'] = {'SimplePrefix': {}}
+ assert response['LoggingEnabled'] == logging_enabled
+
+
+import logging
+
+logger = logging.getLogger(__name__)
+
+def _verify_records(records, bucket_name, event_type, src_keys, record_type, expected_count):
+ keys_found = []
+ for record in iter(records.splitlines()):
+ logger.info('bucket log record: %s', _parse_log_record(record, record_type))
+ if bucket_name in record and event_type in record:
+ for key in src_keys:
+ if key in record:
+ keys_found.append(key)
+ break
+ logger.info('keys found in bucket log: %s', str(keys_found))
+ logger.info('keys from the source bucket: %s', str(src_keys))
+ return len(keys_found) == expected_count
+
+def randcontent():
+ letters = string.ascii_lowercase
+ length = random.randint(10, 1024)
+ return ''.join(random.choice(letters) for i in range(length))
+
+
+def _bucket_logging_put_objects(versioned):
+ src_bucket_name = get_new_bucket()
+ if versioned:
+ check_configure_versioning_retry(src_bucket_name, "Enabled", "Enabled")
+ log_bucket_name = get_new_bucket_name()
+ log_bucket = get_new_bucket_resource(name=log_bucket_name)
+ client = get_client()
+ has_extensions = _has_bucket_logging_extension()
+
+ # minimal configuration
+ logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': 'log/'}
+ if has_extensions:
+ logging_enabled['ObjectRollTime'] = expected_object_roll_time
+ logging_enabled['LoggingType'] = 'Journal'
+ response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={
+ 'LoggingEnabled': logging_enabled,
+ })
+ assert response['ResponseMetadata']['HTTPStatusCode'] == 200
+
+ num_keys = 5
+ for j in range(num_keys):
+ name = 'myobject'+str(j)
+ client.put_object(Bucket=src_bucket_name, Key=name, Body=randcontent())
+ if versioned:
+ client.put_object(Bucket=src_bucket_name, Key=name, Body=randcontent())
+
+ if versioned:
+ expected_count = 2*num_keys
+ else:
+ expected_count = num_keys
+
+ response = client.list_objects_v2(Bucket=src_bucket_name)
+ src_keys = _get_keys(response)
+
+ time.sleep(expected_object_roll_time)
+ client.put_object(Bucket=src_bucket_name, Key='dummy', Body='dummy')
+
+ response = client.list_objects_v2(Bucket=log_bucket_name)
+ keys = _get_keys(response)
+ assert len(keys) == 1
+
+ record_type = 'Standard' if not has_extensions else 'Journal'
+
+ for key in keys:
+ assert key.startswith('log/')
+ response = client.get_object(Bucket=log_bucket_name, Key=key)
+ body = _get_body(response)
+ assert _verify_records(body, src_bucket_name, 'REST.PUT.OBJECT', src_keys, record_type, expected_count)
+
+
+@pytest.mark.bucket_logging
+def test_bucket_logging_put_objects():
+ _bucket_logging_put_objects(False)
+
+
+@pytest.mark.bucket_logging
+def test_bucket_logging_put_objects_versioned():
+ _bucket_logging_put_objects(True)
+
+
+@pytest.mark.bucket_logging
+def test_bucket_logging_put_concurrency():
+ src_bucket_name = get_new_bucket()
+ log_bucket_name = get_new_bucket_name()
+ log_bucket = get_new_bucket_resource(name=log_bucket_name)
+ client = get_client(client_config=botocore.config.Config(max_pool_connections=50))
+ has_extensions = _has_bucket_logging_extension()
+
+ # minimal configuration
+ logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': 'log/'}
+ if has_extensions:
+ logging_enabled['ObjectRollTime'] = expected_object_roll_time
+ logging_enabled['LoggingType'] = 'Journal'
+ response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={
+ 'LoggingEnabled': logging_enabled,
+ })
+ assert response['ResponseMetadata']['HTTPStatusCode'] == 200
+
+ num_keys = 50
+ t = []
+ for i in range(num_keys):
+ name = 'myobject'+str(i)
+ thr = threading.Thread(target = client.put_object,
+ kwargs={'Bucket': src_bucket_name, 'Key': name, 'Body': randcontent()})
+ thr.start()
+ t.append(thr)
+ _do_wait_completion(t)
+
+ response = client.list_objects_v2(Bucket=src_bucket_name)
+ src_keys = _get_keys(response)
+
+ time.sleep(expected_object_roll_time)
+ t = []
+ for i in range(num_keys):
+ thr = threading.Thread(target = client.put_object,
+ kwargs={'Bucket': src_bucket_name, 'Key': 'dummy', 'Body': 'dummy'})
+ thr.start()
+ t.append(thr)
+ _do_wait_completion(t)
+
+ response = client.list_objects_v2(Bucket=log_bucket_name)
+ keys = _get_keys(response)
+ assert len(keys) == 1
+
+ record_type = 'Standard' if not has_extensions else 'Journal'
+
+ for key in keys:
+ logger.info('logging object: %s', key)
+ assert key.startswith('log/')
+ response = client.get_object(Bucket=log_bucket_name, Key=key)
+ body = _get_body(response)
+ assert _verify_records(body, src_bucket_name, 'REST.PUT.OBJECT', src_keys, record_type, num_keys)
+
+
+def _bucket_logging_delete_objects(versioned):
+ src_bucket_name = get_new_bucket()
+ if versioned:
+ check_configure_versioning_retry(src_bucket_name, "Enabled", "Enabled")
+ log_bucket_name = get_new_bucket_name()
+ log_bucket = get_new_bucket_resource(name=log_bucket_name)
+ client = get_client()
+ has_extensions = _has_bucket_logging_extension()
+
+ num_keys = 5
+ for j in range(num_keys):
+ name = 'myobject'+str(j)
+ client.put_object(Bucket=src_bucket_name, Key=name, Body=randcontent())
+ if versioned:
+ client.put_object(Bucket=src_bucket_name, Key=name, Body=randcontent())
+
+ # minimal configuration
+ logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': 'log/'}
+ if has_extensions:
+ logging_enabled['ObjectRollTime'] = expected_object_roll_time
+ logging_enabled['LoggingType'] = 'Journal'
+ response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={
+ 'LoggingEnabled': logging_enabled,
+ })
+
+ assert response['ResponseMetadata']['HTTPStatusCode'] == 200
+ response = client.list_objects_v2(Bucket=src_bucket_name)
+ src_keys = _get_keys(response)
+ for key in src_keys:
+ if versioned:
+ response = client.head_object(Bucket=src_bucket_name, Key=key)
+ client.delete_object(Bucket=src_bucket_name, Key=key, VersionId=response['VersionId'])
+ client.delete_object(Bucket=src_bucket_name, Key=key)
+
+ time.sleep(expected_object_roll_time)
+ client.put_object(Bucket=src_bucket_name, Key='dummy', Body='dummy')
+
+ response = client.list_objects_v2(Bucket=log_bucket_name)
+ keys = _get_keys(response)
+ assert len(keys) == 1
+
+ if versioned:
+ expected_count = 2*num_keys
+ else:
+ expected_count = num_keys
+
+ key = keys[0]
+ assert key.startswith('log/')
+ response = client.get_object(Bucket=log_bucket_name, Key=key)
+ body = _get_body(response)
+ record_type = 'Standard' if not has_extensions else 'Journal'
+ assert _verify_records(body, src_bucket_name, 'REST.DELETE.OBJECT', src_keys, record_type, expected_count)
+
+
+@pytest.mark.bucket_logging
+def test_bucket_logging_delete_objects():
+ _bucket_logging_delete_objects(False)
+
+
+@pytest.mark.bucket_logging
+def test_bucket_logging_delete_objects_versioned():
+ _bucket_logging_delete_objects(True)
+
+
+@pytest.mark.bucket_logging
+def _bucket_logging_get_objects(versioned):
+ src_bucket_name = get_new_bucket()
+ if versioned:
+ check_configure_versioning_retry(src_bucket_name, "Enabled", "Enabled")
+ log_bucket_name = get_new_bucket_name()
+ log_bucket = get_new_bucket_resource(name=log_bucket_name)
+ client = get_client()
+ has_extensions = _has_bucket_logging_extension()
+
+ num_keys = 5
+ for j in range(num_keys):
+ name = 'myobject'+str(j)
+ client.put_object(Bucket=src_bucket_name, Key=name, Body=randcontent())
+ if versioned:
+ client.put_object(Bucket=src_bucket_name, Key=name, Body=randcontent())
+
+ # minimal configuration
+ logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': 'log/'}
+ if has_extensions:
+ logging_enabled['ObjectRollTime'] = expected_object_roll_time
+ logging_enabled['LoggingType'] = 'Standard'
+ response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={
+ 'LoggingEnabled': logging_enabled,
+ })
+
+ assert response['ResponseMetadata']['HTTPStatusCode'] == 200
+ response = client.list_objects_v2(Bucket=src_bucket_name)
+ src_keys = _get_keys(response)
+ for key in src_keys:
+ if versioned:
+ response = client.head_object(Bucket=src_bucket_name, Key=key)
+ client.get_object(Bucket=src_bucket_name, Key=key, VersionId=response['VersionId'])
+ client.get_object(Bucket=src_bucket_name, Key=key)
+
+ time.sleep(expected_object_roll_time)
+ client.put_object(Bucket=src_bucket_name, Key='dummy', Body='dummy')
+
+ response = client.list_objects_v2(Bucket=log_bucket_name)
+ keys = _get_keys(response)
+ assert len(keys) == 1
+
+ if versioned:
+ expected_count = 2*num_keys
+ else:
+ expected_count = num_keys
+
+ key = keys[0]
+ assert key.startswith('log/')
+ response = client.get_object(Bucket=log_bucket_name, Key=key)
+ body = _get_body(response)
+ assert _verify_records(body, src_bucket_name, 'REST.GET.OBJECT', src_keys, 'Standard', expected_count)
+
+
+@pytest.mark.bucket_logging
+def test_bucket_logging_get_objects():
+ _bucket_logging_get_objects(False)
+
+
+@pytest.mark.bucket_logging
+def test_bucket_logging_get_objects_versioned():
+ _bucket_logging_get_objects(True)
+
+
+@pytest.mark.bucket_logging
+def _bucket_logging_copy_objects(versioned, another_bucket):
+ src_bucket_name = get_new_bucket()
+ if another_bucket:
+ dst_bucket_name = get_new_bucket()
+ else:
+ dst_bucket_name = src_bucket_name
+ if versioned:
+ check_configure_versioning_retry(src_bucket_name, "Enabled", "Enabled")
+ log_bucket_name = get_new_bucket_name()
+ log_bucket = get_new_bucket_resource(name=log_bucket_name)
+ client = get_client()
+ has_extensions = _has_bucket_logging_extension()
+
+ num_keys = 5
+ for j in range(num_keys):
+ name = 'myobject'+str(j)
+ client.put_object(Bucket=src_bucket_name, Key=name, Body=randcontent())
+ if versioned:
+ client.put_object(Bucket=src_bucket_name, Key=name, Body=randcontent())
+
+ # minimal configuration
+ logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': 'log/'}
+ if has_extensions:
+ logging_enabled['ObjectRollTime'] = expected_object_roll_time
+ logging_enabled['LoggingType'] = 'Journal'
+ response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={
+ 'LoggingEnabled': logging_enabled,
+ })
+ assert response['ResponseMetadata']['HTTPStatusCode'] == 200
+ if another_bucket:
+ response = client.put_bucket_logging(Bucket=dst_bucket_name, BucketLoggingStatus={
+ 'LoggingEnabled': logging_enabled,
+ })
+ assert response['ResponseMetadata']['HTTPStatusCode'] == 200
+
+ response = client.list_objects_v2(Bucket=src_bucket_name)
+ src_keys = _get_keys(response)
+ dst_keys = []
+ for key in src_keys:
+ dst_keys.append('copy_of_'+key)
+ if another_bucket:
+ client.copy_object(Bucket=dst_bucket_name, Key='copy_of_'+key, CopySource={'Bucket': src_bucket_name, 'Key': key})
+ else:
+ client.copy_object(Bucket=src_bucket_name, Key='copy_of_'+key, CopySource={'Bucket': src_bucket_name, 'Key': key})
+
+ time.sleep(expected_object_roll_time)
+ client.put_object(Bucket=src_bucket_name, Key='dummy', Body='dummy')
+
+ response = client.list_objects_v2(Bucket=log_bucket_name)
+ keys = _get_keys(response)
+ assert len(keys) == 1
+
+ key = keys[0]
+ assert key.startswith('log/')
+ response = client.get_object(Bucket=log_bucket_name, Key=key)
+ body = _get_body(response)
+ record_type = 'Standard' if not has_extensions else 'Journal'
+ assert _verify_records(body, dst_bucket_name, 'REST.PUT.OBJECT', dst_keys, record_type, num_keys)
+
+
+@pytest.mark.bucket_logging
+def test_bucket_logging_copy_objects():
+ _bucket_logging_copy_objects(False, False)
+
+
+@pytest.mark.bucket_logging
+def test_bucket_logging_copy_objects_versioned():
+ _bucket_logging_copy_objects(True, False)
+
+
+@pytest.mark.bucket_logging
+def test_bucket_logging_copy_objects_bucket():
+ _bucket_logging_copy_objects(False, True)
+
+
+@pytest.mark.bucket_logging
+def test_bucket_logging_copy_objects_bucket_versioned():
+ _bucket_logging_copy_objects(True, True)
+
+
+@pytest.mark.bucket_logging
+def _bucket_logging_head_objects(versioned):
+ src_bucket_name = get_new_bucket()
+ if versioned:
+ check_configure_versioning_retry(src_bucket_name, "Enabled", "Enabled")
+ log_bucket_name = get_new_bucket_name()
+ log_bucket = get_new_bucket_resource(name=log_bucket_name)
+ client = get_client()
+ has_extensions = _has_bucket_logging_extension()
+
+ num_keys = 5
+ for j in range(num_keys):
+ name = 'myobject'+str(j)
+ client.put_object(Bucket=src_bucket_name, Key=name, Body=randcontent())
+
+ logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': 'log/'}
+ if has_extensions:
+ logging_enabled['ObjectRollTime'] = expected_object_roll_time
+ logging_enabled['LoggingType'] = 'Standard'
+ response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={
+ 'LoggingEnabled': logging_enabled,
+ })
+
+ assert response['ResponseMetadata']['HTTPStatusCode'] == 200
+ response = client.list_objects_v2(Bucket=src_bucket_name)
+ src_keys = _get_keys(response)
+ for key in src_keys:
+ if versioned:
+ response = client.head_object(Bucket=src_bucket_name, Key=key)
+ client.head_object(Bucket=src_bucket_name, Key=key, VersionId=response['VersionId'])
+ else:
+ client.head_object(Bucket=src_bucket_name, Key=key)
+
+ time.sleep(expected_object_roll_time)
+ client.put_object(Bucket=src_bucket_name, Key='dummy', Body='dummy')
+
+ response = client.list_objects_v2(Bucket=log_bucket_name)
+ keys = _get_keys(response)
+ assert len(keys) == 1
+
+ if versioned:
+ expected_count = 2*num_keys
+ else:
+ expected_count = num_keys
+
+ key = keys[0]
+ assert key.startswith('log/')
+ response = client.get_object(Bucket=log_bucket_name, Key=key)
+ body = _get_body(response)
+ assert _verify_records(body, src_bucket_name, 'REST.HEAD.OBJECT', src_keys, 'Standard', expected_count)
+
+
+@pytest.mark.bucket_logging
+def test_bucket_logging_head_objects():
+ _bucket_logging_head_objects(False)
+
+
+@pytest.mark.bucket_logging
+def test_bucket_logging_head_objects_versioned():
+ _bucket_logging_head_objects(True)
+
+
+@pytest.mark.bucket_logging
+def _bucket_logging_mpu(versioned):
+ src_bucket_name = get_new_bucket()
+ if versioned:
+ check_configure_versioning_retry(src_bucket_name, "Enabled", "Enabled")
+ log_bucket_name = get_new_bucket_name()
+ log_bucket = get_new_bucket_resource(name=log_bucket_name)
+ client = get_client()
+ has_extensions = _has_bucket_logging_extension()
+
+ # minimal configuration
+ logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': 'log/'}
+ if has_extensions:
+ logging_enabled['ObjectRollTime'] = expected_object_roll_time
+ logging_enabled['LoggingType'] = 'Journal'
+ response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={
+ 'LoggingEnabled': logging_enabled,
+ })
+ assert response['ResponseMetadata']['HTTPStatusCode'] == 200
+
+ src_key = "myobject"
+ objlen = 30 * 1024 * 1024
+ (upload_id, data, parts) = _multipart_upload(bucket_name=src_bucket_name, key=src_key, size=objlen)
+ client.complete_multipart_upload(Bucket=src_bucket_name, Key=src_key, UploadId=upload_id, MultipartUpload={'Parts': parts})
+ if versioned:
+ (upload_id, data, parts) = _multipart_upload(bucket_name=src_bucket_name, key=src_key, size=objlen)
+ client.complete_multipart_upload(Bucket=src_bucket_name, Key=src_key, UploadId=upload_id, MultipartUpload={'Parts': parts})
+
+ time.sleep(expected_object_roll_time)
+ client.put_object(Bucket=src_bucket_name, Key='dummy', Body='dummy')
+
+ response = client.list_objects_v2(Bucket=log_bucket_name)
+ keys = _get_keys(response)
+ assert len(keys) == 1
+
+ if versioned:
+ expected_count = 4 if not has_extensions else 2
+ else:
+ expected_count = 2 if not has_extensions else 1
+
+ key = keys[0]
+ assert key.startswith('log/')
+ response = client.get_object(Bucket=log_bucket_name, Key=key)
+ body = _get_body(response)
+ record_type = 'Standard' if not has_extensions else 'Journal'
+ assert _verify_records(body, src_bucket_name, 'REST.POST.UPLOAD', [src_key, src_key], record_type, expected_count)
+
+
+@pytest.mark.bucket_logging
+def test_bucket_logging_mpu():
+ _bucket_logging_mpu(False)
+
+
+@pytest.mark.bucket_logging
+def test_bucket_logging_mpu_versioned():
+ _bucket_logging_mpu(True)
+
+
+@pytest.mark.bucket_logging
+def _bucket_logging_mpu_copy(versioned):
+ src_bucket_name = get_new_bucket()
+ if versioned:
+ check_configure_versioning_retry(src_bucket_name, "Enabled", "Enabled")
+ log_bucket_name = get_new_bucket_name()
+ log_bucket = get_new_bucket_resource(name=log_bucket_name)
+ client = get_client()
+ has_extensions = _has_bucket_logging_extension()
+
+ src_key = "myobject"
+ objlen = 30 * 1024 * 1024
+ (upload_id, data, parts) = _multipart_upload(bucket_name=src_bucket_name, key=src_key, size=objlen)
+ client.complete_multipart_upload(Bucket=src_bucket_name, Key=src_key, UploadId=upload_id, MultipartUpload={'Parts': parts})
+ if versioned:
+ (upload_id, data, parts) = _multipart_upload(bucket_name=src_bucket_name, key=src_key, size=objlen)
+ client.complete_multipart_upload(Bucket=src_bucket_name, Key=src_key, UploadId=upload_id, MultipartUpload={'Parts': parts})
+
+ # minimal configuration
+ logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': 'log/'}
+ if has_extensions:
+ logging_enabled['ObjectRollTime'] = expected_object_roll_time
+ logging_enabled['LoggingType'] = 'Journal'
+ response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={
+ 'LoggingEnabled': logging_enabled,
+ })
+ assert response['ResponseMetadata']['HTTPStatusCode'] == 200
+
+ client.copy_object(Bucket=src_bucket_name, Key='copy_of_'+src_key, CopySource={'Bucket': src_bucket_name, 'Key': src_key})
+
+ time.sleep(expected_object_roll_time)
+ client.put_object(Bucket=src_bucket_name, Key='dummy', Body='dummy')
+
+ response = client.list_objects_v2(Bucket=log_bucket_name)
+ keys = _get_keys(response)
+ assert len(keys) == 1
+
+ key = keys[0]
+ assert key.startswith('log/')
+ response = client.get_object(Bucket=log_bucket_name, Key=key)
+ body = _get_body(response)
+ record_type = 'Standard' if not has_extensions else 'Journal'
+ assert _verify_records(body, src_bucket_name, 'REST.PUT.OBJECT', ['copy_of_'+src_key], record_type, 1)
+
+
+@pytest.mark.bucket_logging
+def test_bucket_logging_mpu_copy():
+ _bucket_logging_mpu_copy(False)
+
+
+@pytest.mark.bucket_logging
+def test_bucket_logging_mpu_copy_versioned():
+ _bucket_logging_mpu_copy(True)
+
+
+def _bucket_logging_multi_delete(versioned):
+ src_bucket_name = get_new_bucket()
+ if versioned:
+ check_configure_versioning_retry(src_bucket_name, "Enabled", "Enabled")
+ log_bucket_name = get_new_bucket_name()
+ log_bucket = get_new_bucket_resource(name=log_bucket_name)
+ client = get_client()
+ has_extensions = _has_bucket_logging_extension()
+
+ num_keys = 5
+ for j in range(num_keys):
+ name = 'myobject'+str(j)
+ client.put_object(Bucket=src_bucket_name, Key=name, Body=randcontent())
+ if versioned:
+ client.put_object(Bucket=src_bucket_name, Key=name, Body=randcontent())
+
+ # minimal configuration
+ logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': 'log/'}
+ if has_extensions:
+ logging_enabled['ObjectRollTime'] = expected_object_roll_time
+ logging_enabled['LoggingType'] = 'Journal'
+ response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={
+ 'LoggingEnabled': logging_enabled,
+ })
+
+ assert response['ResponseMetadata']['HTTPStatusCode'] == 200
+ response = client.list_objects_v2(Bucket=src_bucket_name)
+ src_keys = _get_keys(response)
+ if versioned:
+ response = client.list_object_versions(Bucket=src_bucket_name)
+ objs_list = []
+ for version in response['Versions']:
+ obj_dict = {'Key': version['Key'], 'VersionId': version['VersionId']}
+ objs_list.append(obj_dict)
+ objs_dict = {'Objects': objs_list}
+ client.delete_objects(Bucket=src_bucket_name, Delete=objs_dict)
+ else:
+ objs_dict = _make_objs_dict(key_names=src_keys)
+ client.delete_objects(Bucket=src_bucket_name, Delete=objs_dict)
+
+ time.sleep(expected_object_roll_time)
+ client.put_object(Bucket=src_bucket_name, Key='dummy', Body='dummy')
+
+ response = client.list_objects_v2(Bucket=log_bucket_name)
+ keys = _get_keys(response)
+ assert len(keys) == 1
+
+ if versioned:
+ expected_count = 2*num_keys
+ else:
+ expected_count = num_keys
+
+ key = keys[0]
+ assert key.startswith('log/')
+ response = client.get_object(Bucket=log_bucket_name, Key=key)
+ body = _get_body(response)
+ record_type = 'Standard' if not has_extensions else 'Journal'
+ assert _verify_records(body, src_bucket_name, "REST.POST.DELETE_MULTI_OBJECT", src_keys, record_type, expected_count)
+
+
+@pytest.mark.bucket_logging
+def test_bucket_logging_multi_delete():
+ _bucket_logging_multi_delete(False)
+
+
+@pytest.mark.bucket_logging
+def test_bucket_logging_multi_delete_versioned():
+ _bucket_logging_multi_delete(True)
+
+
+def _bucket_logging_type(logging_type):
+ src_bucket_name = get_new_bucket_name()
+ src_bucket = get_new_bucket_resource(name=src_bucket_name)
+ log_bucket_name = get_new_bucket_name()
+ log_bucket = get_new_bucket_resource(name=log_bucket_name)
+ client = get_client()
+ logging_enabled = {
+ 'TargetBucket': log_bucket_name,
+ 'TargetPrefix': 'log/',
+ 'ObjectRollTime': expected_object_roll_time,
+ 'LoggingType': logging_type
+ }
+ response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={
+ 'LoggingEnabled': logging_enabled,
+ })
+ num_keys = 5
+ for j in range(num_keys):
+ name = 'myobject'+str(j)
+ client.put_object(Bucket=src_bucket_name, Key=name, Body=randcontent())
+ client.head_object(Bucket=src_bucket_name, Key=name)
+
+ response = client.list_objects_v2(Bucket=src_bucket_name)
+ src_keys = _get_keys(response)
+
+ time.sleep(expected_object_roll_time)
+ client.put_object(Bucket=src_bucket_name, Key='dummy', Body='dummy')
+ client.head_object(Bucket=src_bucket_name, Key='dummy')
+
+ response = client.list_objects_v2(Bucket=log_bucket_name)
+ keys = _get_keys(response)
+ assert len(keys) == 1
+
+ key = keys[0]
+ assert key.startswith('log/')
+ response = client.get_object(Bucket=log_bucket_name, Key=key)
+ body = _get_body(response)
+ if logging_type == 'Journal':
+ assert _verify_records(body, src_bucket_name, 'REST.PUT.OBJECT', src_keys, 'Journal', num_keys)
+ assert _verify_records(body, src_bucket_name, 'REST.HEAD.OBJECT', src_keys, 'Journal', num_keys) == False
+ elif logging_type == 'Standard':
+ assert _verify_records(body, src_bucket_name, 'REST.HEAD.OBJECT', src_keys, 'Standard', num_keys)
+ assert _verify_records(body, src_bucket_name, 'REST.PUT.OBJECT', src_keys, 'Standard', num_keys)
+ else:
+ assert False, 'invalid logging type:'+logging_type
+
+
+@pytest.mark.bucket_logging
+@pytest.mark.fails_on_aws
+def test_bucket_logging_event_type_j():
+ if not _has_bucket_logging_extension():
+ pytest.skip('ceph extension to bucket logging not supported at client')
+ _bucket_logging_type('Journal')
+
+
+@pytest.mark.bucket_logging
+@pytest.mark.fails_on_aws
+def test_bucket_logging_event_type_s():
+ if not _has_bucket_logging_extension():
+ pytest.skip('ceph extension to bucket logging not supported at client')
+ _bucket_logging_type('Standard')
+
+
+@pytest.mark.bucket_logging
+@pytest.mark.fails_on_aws
+def test_bucket_logging_roll_time():
+ if not _has_bucket_logging_extension():
+ pytest.skip('ceph extension to bucket logging not supported at client')
+ src_bucket_name = get_new_bucket_name()
+ src_bucket = get_new_bucket_resource(name=src_bucket_name)
+ log_bucket_name = get_new_bucket_name()
+ log_bucket = get_new_bucket_resource(name=log_bucket_name)
+ client = get_client()
+
+ roll_time = 10
+ logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': 'log/', 'ObjectRollTime': roll_time}
+ response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={
+ 'LoggingEnabled': logging_enabled,
+ })
+ assert response['ResponseMetadata']['HTTPStatusCode'] == 200
+
+ num_keys = 5
+ for j in range(num_keys):
+ name = 'myobject'+str(j)
+ client.put_object(Bucket=src_bucket_name, Key=name, Body=randcontent())
+
+ response = client.list_objects_v2(Bucket=src_bucket_name)
+ src_keys = _get_keys(response)
+
+ time.sleep(roll_time/2)
+ client.put_object(Bucket=src_bucket_name, Key='myobject', Body=randcontent())
+
+ response = client.list_objects_v2(Bucket=log_bucket_name)
+ keys = _get_keys(response)
+ assert len(keys) == 0
+
+ time.sleep(roll_time/2)
+ client.put_object(Bucket=src_bucket_name, Key='myobject', Body=randcontent())
+
+ response = client.list_objects_v2(Bucket=log_bucket_name)
+ keys = _get_keys(response)
+ len(keys) == 1
+
+ key = keys[0]
+ assert key.startswith('log/')
+ response = client.get_object(Bucket=log_bucket_name, Key=key)
+ body = _get_body(response)
+ assert _verify_records(body, src_bucket_name, 'REST.PUT.OBJECT', src_keys, 'Standard', num_keys)
+ client.delete_object(Bucket=log_bucket_name, Key=key)
+
+ num_keys = 25
+ for j in range(num_keys):
+ name = 'myobject'+str(j)
+ client.put_object(Bucket=src_bucket_name, Key=name, Body=randcontent())
+ time.sleep(1)
+
+ response = client.list_objects_v2(Bucket=src_bucket_name)
+ src_keys = _get_keys(response)
+
+ time.sleep(roll_time)
+ client.put_object(Bucket=src_bucket_name, Key='myobject', Body=randcontent())
+
+ response = client.list_objects_v2(Bucket=log_bucket_name)
+ keys = _get_keys(response)
+ assert len(keys) > 1
+
+ body = ''
+ for key in keys:
+ assert key.startswith('log/')
+ response = client.get_object(Bucket=log_bucket_name, Key=key)
+ body += _get_body(response)
+ assert _verify_records(body, src_bucket_name, 'REST.PUT.OBJECT', src_keys, 'Standard', num_keys+1)
+
+
+@pytest.mark.bucket_logging
+def test_bucket_logging_multiple_prefixes():
+ log_bucket_name = get_new_bucket_name()
+ log_bucket = get_new_bucket_resource(name=log_bucket_name)
+ client = get_client()
+ has_extensions = _has_bucket_logging_extension()
+
+ num_buckets = 5
+ buckets = []
+ bucket_name_prefix = get_new_bucket_name()
+ for j in range(num_buckets):
+ src_bucket_name = bucket_name_prefix+str(j)
+ src_bucket = get_new_bucket_resource(name=src_bucket_name)
+ logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': src_bucket_name+'/'}
+ if has_extensions:
+ logging_enabled['ObjectRollTime'] = expected_object_roll_time
+ response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={
+ 'LoggingEnabled': logging_enabled,
+ })
+ assert response['ResponseMetadata']['HTTPStatusCode'] == 200
+ buckets.append(src_bucket_name)
+
+ num_keys = 5
+ for src_bucket_name in buckets:
+ for j in range(num_keys):
+ name = 'myobject'+str(j)
+ client.put_object(Bucket=src_bucket_name, Key=name, Body=randcontent())
+
+ time.sleep(expected_object_roll_time)
+ for src_bucket_name in buckets:
+ client.head_object(Bucket=src_bucket_name, Key='myobject0')
+
+ response = client.list_objects_v2(Bucket=log_bucket_name)
+ keys = _get_keys(response)
+ assert len(keys) >= num_buckets
+
+ for key in keys:
+ response = client.get_object(Bucket=log_bucket_name, Key=key)
+ body = _get_body(response)
+ found = False
+ for src_bucket_name in buckets:
+ if key.startswith(src_bucket_name):
+ found = True
+ response = client.list_objects_v2(Bucket=src_bucket_name)
+ src_keys = _get_keys(response)
+ assert _verify_records(body, src_bucket_name, 'REST.PUT.OBJECT', src_keys, 'Standard', num_keys)
+ assert found
+
+
+@pytest.mark.bucket_logging
+def test_bucket_logging_single_prefix():
+ log_bucket_name = get_new_bucket_name()
+ log_bucket = get_new_bucket_resource(name=log_bucket_name)
+ client = get_client()
+ has_extensions = _has_bucket_logging_extension()
+
+ num_buckets = 5
+ buckets = []
+ bucket_name_prefix = get_new_bucket_name()
+ for j in range(num_buckets):
+ src_bucket_name = bucket_name_prefix+str(j)
+ src_bucket = get_new_bucket_resource(name=src_bucket_name)
+ # minimal configuration
+ logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': 'log/'}
+ if has_extensions:
+ logging_enabled['ObjectRollTime'] = expected_object_roll_time
+ response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={
+ 'LoggingEnabled': logging_enabled,
+ })
+ assert response['ResponseMetadata']['HTTPStatusCode'] == 200
+ buckets.append(src_bucket_name)
+
+ num_keys = 5
+ bucket_ind = 0
+ for src_bucket_name in buckets:
+ bucket_ind += 1
+ for j in range(num_keys):
+ name = 'myobject'+str(bucket_ind)+str(j)
+ client.put_object(Bucket=src_bucket_name, Key=name, Body=randcontent())
+
+ time.sleep(expected_object_roll_time)
+ client.put_object(Bucket=buckets[0], Key='dummy', Body='dummy')
+
+ response = client.list_objects_v2(Bucket=log_bucket_name)
+ keys = _get_keys(response)
+ assert len(keys) == 1
+
+ key = keys[0]
+ response = client.get_object(Bucket=log_bucket_name, Key=key)
+ body = _get_body(response)
+ found = False
+ for src_bucket_name in buckets:
+ response = client.list_objects_v2(Bucket=src_bucket_name)
+ src_keys = _get_keys(response)
+ found = _verify_records(body, src_bucket_name, 'REST.PUT.OBJECT', src_keys, 'Standard', num_keys)
+ assert found