def _flush_logs(client, src_bucket_name, dummy_key="dummy"):
if _has_bucket_logging_extension():
- client.post_bucket_logging(Bucket=src_bucket_name)
+ result = client.post_bucket_logging(Bucket=src_bucket_name)
+ assert result['ResponseMetadata']['HTTPStatusCode'] == 200
+ return result['FlushedLoggingObject']
else:
time.sleep(expected_object_roll_time*1.1)
client.put_object(Bucket=src_bucket_name, Key=dummy_key, Body='dummy')
+ return None
def _bucket_logging_key_filter(log_type):
expected_count = len(names)
- _flush_logs(client, src_bucket_name, dummy_key="test/dummy.txt")
+ flushed_obj = _flush_logs(client, src_bucket_name, dummy_key="test/dummy.txt")
response = client.list_objects_v2(Bucket=log_bucket_name)
keys = _get_keys(response)
assert len(keys) == 1
for key in keys:
+ if flushed_obj is not None:
+ assert key == flushed_obj
assert key.startswith('log/')
response = client.get_object(Bucket=log_bucket_name, Key=key)
body = _get_body(response)
_bucket_logging_key_filter('Journal')
+def _post_bucket_logging(client, src_bucket_name, flushed_objs):
+ result = client.post_bucket_logging(Bucket=src_bucket_name)
+ assert result['ResponseMetadata']['HTTPStatusCode'] == 200
+ flushed_objs[src_bucket_name] = result['FlushedLoggingObject']
+
+
def _bucket_logging_flush(logging_type, single_prefix, concurrency):
if not _has_bucket_logging_extension():
pytest.skip('ceph extension to bucket logging not supported at client')
assert len(keys) == 0
t = []
+ flushed_objs = {}
for src_bucket_name in buckets:
if concurrency:
- thr = threading.Thread(target = client.post_bucket_logging,
- kwargs={'Bucket': src_bucket_name})
+ thr = threading.Thread(target = _post_bucket_logging,
+ args=(client, src_bucket_name, flushed_objs))
thr.start()
t.append(thr)
else:
- client.post_bucket_logging(Bucket=src_bucket_name)
- if single_prefix and logging_type == 'Standard':
- # in case of single prefix we flush only once
- # because flushing itself will be logged
- # and the next flush will commit the log
+ result = client.post_bucket_logging(Bucket=src_bucket_name)
+ assert result['ResponseMetadata']['HTTPStatusCode'] == 200
+ flushed_objs[src_bucket_name] = result['FlushedLoggingObject']
+ if single_prefix:
break
_do_wait_completion(t)
if single_prefix:
assert len(keys) == 1
+ assert len(flushed_objs) == 1
else:
- assert len(keys) >= num_buckets
+ assert len(keys) == num_buckets
+ assert len(flushed_objs) >= num_buckets
for key in keys:
response = client.get_object(Bucket=log_bucket_name, Key=key)
for j in range(num_buckets):
prefix = log_prefixes[j]
if key.startswith(prefix):
+ flushed_obj = flushed_objs.get(buckets[j])
+ if flushed_obj is not None:
+ assert key == flushed_obj
found = True
assert _verify_records(body, buckets[j], 'REST.PUT.OBJECT', src_names, logging_type, num_keys)
assert _verify_records(body, buckets[j], 'REST.DELETE.OBJECT', src_names, logging_type, num_keys)
response = src_client.list_objects_v2(Bucket=src_bucket_name)
src_keys = _get_keys(response)
- _flush_logs(src_client, src_bucket_name)
+ flushed_obj = _flush_logs(src_client, src_bucket_name)
response = log_client.list_objects_v2(Bucket=log_bucket_name)
keys = _get_keys(response)
assert len(keys) == 1
for key in keys:
+ if flushed_obj is not None:
+ assert key == flushed_obj
assert key.startswith('log/')
response = log_client.get_object(Bucket=log_bucket_name, Key=key)
body = _get_body(response)
response = client.list_objects_v2(Bucket=src_bucket_name)
src_keys = _get_keys(response)
- _flush_logs(client, src_bucket_name)
+ flushed_obj = _flush_logs(client, src_bucket_name)
response = client.list_objects_v2(Bucket=log_bucket_name)
keys = _get_keys(response)
record_type = 'Standard' if not has_extensions else 'Journal'
for key in keys:
+ if flushed_obj is not None:
+ assert key == flushed_obj
assert key.startswith('log/')
response = client.get_object(Bucket=log_bucket_name, Key=key)
body = _get_body(response)