assert response['LoggingEnabled'] == logging_enabled
+ @pytest.mark.bucket_logging
+ @pytest.mark.fails_on_aws
+ def test_bucket_logging_mtime():
+ src_bucket_name = get_new_bucket_name()
+ src_bucket = get_new_bucket_resource(name=src_bucket_name)
+ log_bucket_name = get_new_bucket_name()
+ log_bucket = get_new_bucket_resource(name=log_bucket_name)
+ client = get_client()
+ prefix = 'log/'
+ _set_log_bucket_policy(client, log_bucket_name, [src_bucket_name], [prefix])
+ # minimal configuration
+ logging_enabled = {
+ 'TargetBucket': log_bucket_name,
+ 'TargetPrefix': prefix
+ }
+
+ response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={
+ 'LoggingEnabled': logging_enabled,
+ })
+ assert response['ResponseMetadata']['HTTPStatusCode'] == 200
+ response = client.get_bucket_logging(Bucket=src_bucket_name)
+ assert response['ResponseMetadata']['HTTPStatusCode'] == 200
+ mtime = response['ResponseMetadata']['HTTPHeaders']['last-modified']
+
+ # wait and set the same conf - mtime should be the same
+ time.sleep(1)
+ response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={
+ 'LoggingEnabled': logging_enabled,
+ })
+ assert response['ResponseMetadata']['HTTPStatusCode'] == 200
+ response = client.get_bucket_logging(Bucket=src_bucket_name)
+ assert response['ResponseMetadata']['HTTPStatusCode'] == 200
+ assert mtime == response['ResponseMetadata']['HTTPHeaders']['last-modified']
+
+ # wait and change conf - new mtime should be larger
+ time.sleep(1)
+ prefix = 'another-log/'
+ _set_log_bucket_policy(client, log_bucket_name, [src_bucket_name], [prefix])
+ # minimal configuration
+ logging_enabled = {
+ 'TargetBucket': log_bucket_name,
+ 'TargetPrefix': prefix
+ }
+
+ response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={
+ 'LoggingEnabled': logging_enabled,
+ })
+ assert response['ResponseMetadata']['HTTPStatusCode'] == 200
+ response = client.get_bucket_logging(Bucket=src_bucket_name)
+ assert response['ResponseMetadata']['HTTPStatusCode'] == 200
+ assert mtime < response['ResponseMetadata']['HTTPHeaders']['last-modified']
+ mtime = response['ResponseMetadata']['HTTPHeaders']['last-modified']
+
+ # wait and disable/enable conf - new mtime should be larger
+ time.sleep(1)
+ response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={})
+ assert response['ResponseMetadata']['HTTPStatusCode'] == 200
+ response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={
+ 'LoggingEnabled': logging_enabled,
+ })
+ assert response['ResponseMetadata']['HTTPStatusCode'] == 200
+ response = client.get_bucket_logging(Bucket=src_bucket_name)
+ assert response['ResponseMetadata']['HTTPStatusCode'] == 200
+ assert mtime < response['ResponseMetadata']['HTTPHeaders']['last-modified']
+
+
def _flush_logs(client, src_bucket_name, dummy_key="dummy"):
if _has_bucket_logging_extension():
- client.post_bucket_logging(Bucket=src_bucket_name)
+ result = client.post_bucket_logging(Bucket=src_bucket_name)
+ assert result['ResponseMetadata']['HTTPStatusCode'] == 200
+ return result['FlushedLoggingObject']
else:
time.sleep(expected_object_roll_time*1.1)
client.put_object(Bucket=src_bucket_name, Key=dummy_key, Body='dummy')