found = _verify_records(body, src_bucket_name, 'REST.PUT.OBJECT', src_keys, 'Standard', num_keys)
assert found
+
+@pytest.mark.bucket_logging
+@pytest.mark.fails_on_aws
+def test_bucket_logging_object_meta():
+ if not _has_bucket_logging_extension():
+ pytest.skip('ceph extension to bucket logging not supported at client')
+ client = get_client()
+ src_bucket_name = get_new_bucket_name()
+ src_bucket = client.create_bucket(Bucket=src_bucket_name, ObjectLockEnabledForBucket=True)
+ log_bucket_name = get_new_bucket_name()
+ log_bucket = get_new_bucket_resource(name=log_bucket_name)
+
+ prefix = 'log/'
+ _set_log_bucket_policy(client, log_bucket_name, [src_bucket_name], [prefix])
+ logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': prefix, 'LoggingType': 'Journal'}
+ response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={
+ 'LoggingEnabled': logging_enabled,
+ })
+ assert response['ResponseMetadata']['HTTPStatusCode'] == 200
+
+ name = 'myobject'
+ response = client.put_object(Bucket=src_bucket_name, Key=name, Body=randcontent())
+ version_id = response['VersionId']
+ # PutObjectAcl
+ client.put_object_acl(ACL='public-read-write', Bucket=src_bucket_name, Key=name, VersionId=version_id)
+ # PutObjectTagging
+ client.put_object_tagging(Bucket=src_bucket_name, Key=name, VersionId=version_id,
+ Tagging={'TagSet': [{'Key': 'tag1', 'Value': 'value1'}, {'Key': 'tag2', 'Value': 'value2'}]})
+ # DeleteObjectTagging
+ client.delete_object_tagging(Bucket=src_bucket_name, Key=name, VersionId=version_id)
+ # PutObjectLegalHold
+ client.put_object_legal_hold(Bucket=src_bucket_name, Key=name, LegalHold={'Status': 'ON'})
+ # PutObjectRetention
+ client.put_object_retention(Bucket=src_bucket_name, Key=name, Retention={'Mode': 'GOVERNANCE', 'RetainUntilDate': datetime.datetime(2026, 1, 1)})
+
+ _flush_logs(client, src_bucket_name)
+ response = client.list_objects_v2(Bucket=log_bucket_name)
+ log_keys = _get_keys(response)
+ assert len(log_keys) == 1
+ log_key = log_keys[0]
+ expected_op_list = ['REST.PUT.OBJECT', 'REST.PUT.ACL', 'REST.PUT.LEGAL_HOLD', 'REST.PUT.RETENTION', 'REST.PUT.OBJECT_TAGGING', 'REST.DELETE.OBJECT_TAGGING']
+ op_list = []
+ response = client.get_object(Bucket=log_bucket_name, Key=log_key)
+ body = _get_body(response)
+ for record in iter(body.splitlines()):
+ parsed_record = _parse_log_record(record, 'Journal')
+ logger.info('bucket log record: %s', json.dumps(parsed_record, indent=4))
+ op_list.append(parsed_record['Operation'])
+ version = parsed_record['VersionID']
+ if version != '-':
+ assert version == version_id
+
+ assert sorted(expected_op_list) == sorted(op_list)
+
+ # allow cleanup
+ client.put_object_legal_hold(Bucket=src_bucket_name, Key=name, LegalHold={'Status': 'OFF'})
+ client.delete_object(Bucket=src_bucket_name, Key=name, VersionId=version_id, BypassGovernanceRetention=True)
+
+
def _bucket_logging_cleanup(cleanup_type, logging_type, single_prefix, concurrency):
if not _has_bucket_logging_extension():
pytest.skip('ceph extension to bucket logging not supported at client')