record = record.replace('[', '"').replace(']', '"')
chunks = shlex.split(record)
assert len(chunks) == 26
- return json.dumps({
+ return {
'BucketOwner': chunks[0],
'BucketName': chunks[1],
'RequestDateTime': chunks[2],
'TLSVersion': chunks[23],
'AccessPointARN': chunks[24],
'ACLRequired': chunks[25],
- }, indent=4)
+ }
def _parse_journal_log_record(record):
record = record.replace('[', '"').replace(']', '"')
chunks = shlex.split(record)
assert len(chunks) == 8
- return json.dumps({
+ return {
'BucketOwner': chunks[0],
'BucketName': chunks[1],
'RequestDateTime': chunks[2],
'ObjectSize': chunks[5],
'VersionID': chunks[6],
'ETAG': chunks[7],
- }, indent=4)
+ }
def _parse_log_record(record, record_type):
if record_type == 'Standard':
expected_object_roll_time = 5
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+def _verify_records(records, bucket_name, event_type, src_keys, record_type, expected_count, exact_match=False):
+ keys_found = []
+ all_keys = []
+ for record in iter(records.splitlines()):
+ parsed_record = _parse_log_record(record, record_type)
+ logger.info('bucket log record: %s', json.dumps(parsed_record, indent=4))
+ if bucket_name in record and event_type in record:
+ all_keys.append(parsed_record['Key'])
+ for key in src_keys:
+ if key in record:
+ keys_found.append(key)
+ break
+ logger.info('keys found in bucket log: %s', str(all_keys))
+ logger.info('keys from the source bucket: %s', str(src_keys))
+ if exact_match:
+ return len(keys_found) == expected_count and len(keys_found) == len(all_keys)
+ return len(keys_found) == expected_count
+
+
+def randcontent():
+ letters = string.ascii_lowercase
+ length = random.randint(10, 1024)
+ return ''.join(random.choice(letters) for i in range(length))
+
+
@pytest.mark.bucket_logging
def test_put_bucket_logging():
src_bucket_name = get_new_bucket_name()
assert response['LoggingEnabled'] == logging_enabled
+def _bucket_logging_key_filter(log_type):
+ src_bucket_name = get_new_bucket_name()
+ src_bucket = get_new_bucket_resource(name=src_bucket_name)
+ log_bucket_name = get_new_bucket_name()
+ log_bucket = get_new_bucket_resource(name=log_bucket_name)
+ client = get_client()
+
+ logging_enabled = {
+ 'TargetBucket': log_bucket_name,
+ 'LoggingType': log_type,
+ 'TargetPrefix': 'log/',
+ 'ObjectRollTime': expected_object_roll_time,
+ 'TargetObjectKeyFormat': {'SimplePrefix': {}},
+ 'RecordsBatchSize': 0,
+ 'Filter':
+ {
+ 'Key': {
+ 'FilterRules': [
+ {'Name': 'prefix', 'Value': 'test/'},
+ {'Name': 'suffix', 'Value': '.txt'}
+ ]
+ }
+ }
+ }
+
+ response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={
+ 'LoggingEnabled': logging_enabled,
+ })
+ assert response['ResponseMetadata']['HTTPStatusCode'] == 200
+
+ response = client.get_bucket_logging(Bucket=src_bucket_name)
+ assert response['ResponseMetadata']['HTTPStatusCode'] == 200
+ if log_type == 'Journal':
+ assert response['LoggingEnabled'] == logging_enabled
+ elif log_type == 'Standard':
+ print('TODO')
+ else:
+ assert False, 'unknown log type: %s' % log_type
+
+ names = []
+ num_keys = 5
+ for j in range(num_keys):
+ name = 'myobject'+str(j)
+ if log_type == 'Standard':
+ # standard log records are not filtered
+ names.append(name)
+ client.put_object(Bucket=src_bucket_name, Key=name, Body=randcontent())
+
+ for j in range(num_keys):
+ name = 'test/'+'myobject'+str(j)+'.txt'
+ names.append(name)
+ client.put_object(Bucket=src_bucket_name, Key=name, Body=randcontent())
+
+ expected_count = len(names)
+
+ time.sleep(expected_object_roll_time)
+ client.put_object(Bucket=src_bucket_name, Key='test/dummy.txt', Body='dummy')
+
+ response = client.list_objects_v2(Bucket=log_bucket_name)
+ keys = _get_keys(response)
+ assert len(keys) == 1
+
+ for key in keys:
+ assert key.startswith('log/')
+ response = client.get_object(Bucket=log_bucket_name, Key=key)
+ body = _get_body(response)
+ assert _verify_records(body, src_bucket_name, 'REST.PUT.OBJECT', names, log_type, expected_count, exact_match=True)
+
+
+@pytest.mark.bucket_logging
+@pytest.mark.fails_on_aws
+def test_bucket_logging_key_filter_s():
+ if not _has_bucket_logging_extension():
+ pytest.skip('ceph extension to bucket logging not supported at client')
+ _bucket_logging_key_filter('Standard')
+
+
+@pytest.mark.bucket_logging
+@pytest.mark.fails_on_aws
+def test_bucket_logging_key_filter_j():
+ if not _has_bucket_logging_extension():
+ pytest.skip('ceph extension to bucket logging not supported at client')
+ _bucket_logging_key_filter('Journal')
+
+
+def _bucket_logging_flush(log_type):
+ src_bucket_name = get_new_bucket_name()
+ src_bucket = get_new_bucket_resource(name=src_bucket_name)
+ log_bucket_name = get_new_bucket_name()
+ log_bucket = get_new_bucket_resource(name=log_bucket_name)
+ client = get_client()
+
+ logging_enabled = {
+ 'TargetBucket': log_bucket_name,
+ 'LoggingType': log_type,
+ 'TargetPrefix': 'log/',
+ 'ObjectRollTime': 300, # 5 minutes
+ 'TargetObjectKeyFormat': {'SimplePrefix': {}},
+ 'RecordsBatchSize': 0,
+ }
+
+ response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={
+ 'LoggingEnabled': logging_enabled,
+ })
+ assert response['ResponseMetadata']['HTTPStatusCode'] == 200
+
+ response = client.get_bucket_logging(Bucket=src_bucket_name)
+ assert response['ResponseMetadata']['HTTPStatusCode'] == 200
+ if log_type == 'Journal':
+ assert response['LoggingEnabled'] == logging_enabled
+ elif log_type == 'Standard':
+ print('TODO')
+ else:
+ assert False, 'unknown log type: %s' % log_type
+
+ num_keys = 5
+ for j in range(num_keys):
+ name = 'myobject'+str(j)
+ client.put_object(Bucket=src_bucket_name, Key=name, Body=randcontent())
+
+ response = client.list_objects_v2(Bucket=src_bucket_name)
+ src_keys = _get_keys(response)
+
+ response = client.post_bucket_logging(Bucket=src_bucket_name)
+ assert response['ResponseMetadata']['HTTPStatusCode'] == 200
+
+ expected_count = num_keys
+
+ response = client.list_objects_v2(Bucket=log_bucket_name)
+ keys = _get_keys(response)
+ assert len(keys) == 1
+
+ for key in keys:
+ assert key.startswith('log/')
+ response = client.get_object(Bucket=log_bucket_name, Key=key)
+ body = _get_body(response)
+ assert _verify_records(body, src_bucket_name, 'REST.PUT.OBJECT', src_keys, log_type, expected_count)
+
+
+@pytest.mark.bucket_logging
+@pytest.mark.fails_on_aws
+def test_bucket_logging_flush_j():
+ if not _has_bucket_logging_extension():
+ pytest.skip('ceph extension to bucket logging not supported at client')
+ _bucket_logging_flush('Journal')
+
+
+@pytest.mark.bucket_logging
+@pytest.mark.fails_on_aws
+def test_bucket_logging_flush_s():
+ if not _has_bucket_logging_extension():
+ pytest.skip('ceph extension to bucket logging not supported at client')
+ _bucket_logging_flush('Standard')
+
+
@pytest.mark.bucket_logging
def test_put_bucket_logging_errors():
src_bucket_name = get_new_bucket_name()
assert response['LoggingEnabled'] == logging_enabled
-import logging
-
-logger = logging.getLogger(__name__)
-
-def _verify_records(records, bucket_name, event_type, src_keys, record_type, expected_count):
- keys_found = []
- for record in iter(records.splitlines()):
- logger.info('bucket log record: %s', _parse_log_record(record, record_type))
- if bucket_name in record and event_type in record:
- for key in src_keys:
- if key in record:
- keys_found.append(key)
- break
- logger.info('keys found in bucket log: %s', str(keys_found))
- logger.info('keys from the source bucket: %s', str(src_keys))
- return len(keys_found) == expected_count
-
-def randcontent():
- letters = string.ascii_lowercase
- length = random.randint(10, 1024)
- return ''.join(random.choice(letters) for i in range(length))
-
-
def _bucket_logging_put_objects(versioned):
src_bucket_name = get_new_bucket()
if versioned: