_bucket_logging_flush('Standard', True, True)
+@pytest.mark.bucket_logging
+@pytest.mark.fails_on_aws
+def test_bucket_logging_put_and_flush():
+ if not _has_bucket_logging_extension():
+ pytest.skip('ceph extension to bucket logging not supported at client')
+ log_bucket_name = get_new_bucket_name()
+ log_bucket = get_new_bucket_resource(name=log_bucket_name)
+ client_config = botocore.config.Config(max_pool_connections=100)
+ client = get_client(client_config=client_config)
+
+ src_bucket_name = get_new_bucket_name()
+ src_bucket = get_new_bucket_resource(name=src_bucket_name)
+ log_prefix = 'log/'
+ _set_log_bucket_policy(client, log_bucket_name, [src_bucket_name], [log_prefix])
+
+ logging_enabled = {'TargetBucket': log_bucket_name,
+ 'LoggingType': 'Journal',
+ 'TargetPrefix': log_prefix
+ }
+
+ response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={
+ 'LoggingEnabled': logging_enabled,
+ })
+ assert response['ResponseMetadata']['HTTPStatusCode'] == 200
+
+ num_keys = 300
+ flush_rate = 10
+ put_threads = []
+ flush_threads = []
+ src_names = []
+ for j in range(num_keys):
+ name = 'myobject'+str(j)
+ src_names.append(name)
+ put_thr = threading.Thread(target=client.put_object,
+ kwargs={'Bucket': src_bucket_name,
+ 'Key': name,
+ 'Body': randcontent()})
+ put_thr.start()
+ put_threads.append(put_thr)
+
+ if j % flush_rate == 0:
+ flush_thr = threading.Thread(target = client.post_bucket_logging,
+ kwargs={'Bucket': src_bucket_name})
+ flush_thr.start()
+ flush_threads.append(flush_thr)
+
+ _do_wait_completion(put_threads)
+ _do_wait_completion(flush_threads)
+
+ # making sure everything is flushed synchronously
+ _flush_logs(client, src_bucket_name)
+
+ response = client.list_objects_v2(Bucket=log_bucket_name)
+ keys = _get_keys(response)
+
+ body = ''
+ prev_key = ''
+ for key in keys:
+ logger.info('bucket log record: %s', key)
+ assert key > prev_key
+ prev_key = key
+ response = client.get_object(Bucket=log_bucket_name, Key=key)
+ body += _get_body(response)
+ assert _verify_records(body, src_bucket_name, 'REST.PUT.OBJECT', src_names, 'Journal', num_keys)
+
+
@pytest.mark.bucket_logging
def test_put_bucket_logging_errors():
src_bucket_name = get_new_bucket_name()