]> git-server-git.apps.pok.os.sepia.ceph.com Git - s3-tests.git/commitdiff
rgw/logging: tests for flush API 606/head
authorYuval Lifshitz <ylifshit@ibm.com>
Wed, 11 Dec 2024 20:54:21 +0000 (20:54 +0000)
committerYuval Lifshitz <ylifshit@ibm.com>
Wed, 11 Dec 2024 20:54:21 +0000 (20:54 +0000)
as well as test for object key filters

Signed-off-by: Yuval Lifshitz <ylifshit@ibm.com>
s3tests_boto3/functional/test_s3.py

index 185b5ae296c734a07198c9d90eb6c157b53098db..7247f2ff712fbf368c1058f5fdbf02882fd0f30b 100644 (file)
@@ -13946,7 +13946,7 @@ def _parse_standard_log_record(record):
     record = record.replace('[', '"').replace(']', '"')
     chunks = shlex.split(record)
     assert len(chunks) == 26
-    return json.dumps({
+    return {
             'BucketOwner':      chunks[0],
             'BucketName':       chunks[1],
             'RequestDateTime':  chunks[2],
@@ -13973,14 +13973,14 @@ def _parse_standard_log_record(record):
             'TLSVersion':       chunks[23],
             'AccessPointARN':   chunks[24],
             'ACLRequired':      chunks[25],
-            }, indent=4)
+            }
 
 
 def _parse_journal_log_record(record):
     record = record.replace('[', '"').replace(']', '"')
     chunks = shlex.split(record)
     assert len(chunks) == 8
-    return json.dumps({
+    return {
             'BucketOwner':      chunks[0],
             'BucketName':       chunks[1],
             'RequestDateTime':  chunks[2],
@@ -13989,7 +13989,7 @@ def _parse_journal_log_record(record):
             'ObjectSize':       chunks[5],
             'VersionID':        chunks[6],
             'ETAG':             chunks[7],
-            }, indent=4)
+            }
 
 def _parse_log_record(record, record_type):
     if record_type == 'Standard':
@@ -14001,6 +14001,36 @@ def _parse_log_record(record, record_type):
 
 expected_object_roll_time = 5
 
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+def _verify_records(records, bucket_name, event_type, src_keys, record_type, expected_count, exact_match=False):
+    keys_found = []
+    all_keys = []
+    for record in iter(records.splitlines()):
+        parsed_record = _parse_log_record(record, record_type)
+        logger.info('bucket log record: %s', json.dumps(parsed_record, indent=4))
+        if bucket_name in record and event_type in record:
+            all_keys.append(parsed_record['Key'])
+            for key in src_keys:
+                if key in record:
+                    keys_found.append(key)
+                    break
+    logger.info('keys found in bucket log: %s', str(all_keys))
+    logger.info('keys from the source bucket: %s', str(src_keys))
+    if exact_match:
+        return len(keys_found) == expected_count and len(keys_found) == len(all_keys)
+    return len(keys_found) == expected_count
+
+
+def randcontent():
+    letters = string.ascii_lowercase
+    length = random.randint(10, 1024)
+    return ''.join(random.choice(letters) for i in range(length))
+
+
 @pytest.mark.bucket_logging
 def test_put_bucket_logging():
     src_bucket_name = get_new_bucket_name()
@@ -14105,6 +14135,161 @@ def test_put_bucket_logging():
     assert response['LoggingEnabled'] == logging_enabled
 
 
+def _bucket_logging_key_filter(log_type):
+    src_bucket_name = get_new_bucket_name()
+    src_bucket = get_new_bucket_resource(name=src_bucket_name)
+    log_bucket_name = get_new_bucket_name()
+    log_bucket = get_new_bucket_resource(name=log_bucket_name)
+    client = get_client()
+    
+    logging_enabled = {
+            'TargetBucket': log_bucket_name, 
+            'LoggingType': log_type,
+            'TargetPrefix': 'log/',
+            'ObjectRollTime': expected_object_roll_time,
+            'TargetObjectKeyFormat': {'SimplePrefix': {}},
+            'RecordsBatchSize': 0,
+             'Filter': 
+                {
+                    'Key': {
+                        'FilterRules': [
+                            {'Name': 'prefix', 'Value': 'test/'},
+                            {'Name': 'suffix', 'Value': '.txt'}
+                        ]
+                    }
+                }
+            }
+
+    response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={
+        'LoggingEnabled': logging_enabled,
+    })
+    assert response['ResponseMetadata']['HTTPStatusCode'] == 200
+
+    response = client.get_bucket_logging(Bucket=src_bucket_name)
+    assert response['ResponseMetadata']['HTTPStatusCode'] == 200
+    if log_type == 'Journal':
+        assert response['LoggingEnabled'] == logging_enabled
+    elif log_type == 'Standard':
+        print('TODO')
+    else:
+        assert False, 'unknown log type: %s' % log_type
+    
+    names = []
+    num_keys = 5
+    for j in range(num_keys):
+        name = 'myobject'+str(j)    
+        if log_type == 'Standard':
+            # standard log records are not filtered
+            names.append(name)
+        client.put_object(Bucket=src_bucket_name, Key=name, Body=randcontent())
+   
+    for j in range(num_keys):
+        name = 'test/'+'myobject'+str(j)+'.txt'
+        names.append(name)
+        client.put_object(Bucket=src_bucket_name, Key=name, Body=randcontent())
+
+    expected_count = len(names)
+
+    time.sleep(expected_object_roll_time)
+    client.put_object(Bucket=src_bucket_name, Key='test/dummy.txt', Body='dummy')
+
+    response = client.list_objects_v2(Bucket=log_bucket_name)
+    keys = _get_keys(response)
+    assert len(keys) == 1
+
+    for key in keys:
+        assert key.startswith('log/')
+        response = client.get_object(Bucket=log_bucket_name, Key=key)
+        body = _get_body(response)
+        assert _verify_records(body, src_bucket_name, 'REST.PUT.OBJECT', names, log_type, expected_count, exact_match=True)
+
+
+@pytest.mark.bucket_logging
+@pytest.mark.fails_on_aws
+def test_bucket_logging_key_filter_s():
+    if not _has_bucket_logging_extension():
+        pytest.skip('ceph extension to bucket logging not supported at client')
+    _bucket_logging_key_filter('Standard')
+
+
+@pytest.mark.bucket_logging
+@pytest.mark.fails_on_aws
+def test_bucket_logging_key_filter_j():
+    if not _has_bucket_logging_extension():
+        pytest.skip('ceph extension to bucket logging not supported at client')
+    _bucket_logging_key_filter('Journal')
+
+
+def _bucket_logging_flush(log_type):
+    src_bucket_name = get_new_bucket_name()
+    src_bucket = get_new_bucket_resource(name=src_bucket_name)
+    log_bucket_name = get_new_bucket_name()
+    log_bucket = get_new_bucket_resource(name=log_bucket_name)
+    client = get_client()
+    
+    logging_enabled = {
+            'TargetBucket': log_bucket_name, 
+            'LoggingType': log_type,
+            'TargetPrefix': 'log/',
+            'ObjectRollTime': 300, # 5 minutes
+            'TargetObjectKeyFormat': {'SimplePrefix': {}},
+            'RecordsBatchSize': 0,
+            }
+
+    response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={
+        'LoggingEnabled': logging_enabled,
+    })
+    assert response['ResponseMetadata']['HTTPStatusCode'] == 200
+
+    response = client.get_bucket_logging(Bucket=src_bucket_name)
+    assert response['ResponseMetadata']['HTTPStatusCode'] == 200
+    if log_type == 'Journal':
+        assert response['LoggingEnabled'] == logging_enabled
+    elif log_type == 'Standard':
+        print('TODO')
+    else:
+        assert False, 'unknown log type: %s' % log_type
+    
+    num_keys = 5
+    for j in range(num_keys):
+        name = 'myobject'+str(j)    
+        client.put_object(Bucket=src_bucket_name, Key=name, Body=randcontent())
+   
+    response = client.list_objects_v2(Bucket=src_bucket_name)
+    src_keys = _get_keys(response)
+    
+    response = client.post_bucket_logging(Bucket=src_bucket_name)
+    assert response['ResponseMetadata']['HTTPStatusCode'] == 200
+
+    expected_count = num_keys
+
+    response = client.list_objects_v2(Bucket=log_bucket_name)
+    keys = _get_keys(response)
+    assert len(keys) == 1
+
+    for key in keys:
+        assert key.startswith('log/')
+        response = client.get_object(Bucket=log_bucket_name, Key=key)
+        body = _get_body(response)
+        assert _verify_records(body, src_bucket_name, 'REST.PUT.OBJECT', src_keys, log_type, expected_count)
+
+
+@pytest.mark.bucket_logging
+@pytest.mark.fails_on_aws
+def test_bucket_logging_flush_j():
+    if not _has_bucket_logging_extension():
+        pytest.skip('ceph extension to bucket logging not supported at client')
+    _bucket_logging_flush('Journal')
+
+
+@pytest.mark.bucket_logging
+@pytest.mark.fails_on_aws
+def test_bucket_logging_flush_s():
+    if not _has_bucket_logging_extension():
+        pytest.skip('ceph extension to bucket logging not supported at client')
+    _bucket_logging_flush('Standard')
+
+
 @pytest.mark.bucket_logging
 def test_put_bucket_logging_errors():
     src_bucket_name = get_new_bucket_name()
@@ -14234,29 +14419,6 @@ def test_put_bucket_logging_extensions():
     assert response['LoggingEnabled'] == logging_enabled
 
 
-import logging
-
-logger = logging.getLogger(__name__)
-
-def _verify_records(records, bucket_name, event_type, src_keys, record_type, expected_count):
-    keys_found = []
-    for record in iter(records.splitlines()):
-        logger.info('bucket log record: %s', _parse_log_record(record, record_type))
-        if bucket_name in record and event_type in record:
-            for key in src_keys:
-                if key in record:
-                    keys_found.append(key)
-                    break
-    logger.info('keys found in bucket log: %s', str(keys_found))
-    logger.info('keys from the source bucket: %s', str(src_keys))
-    return len(keys_found) == expected_count
-
-def randcontent():
-    letters = string.ascii_lowercase
-    length = random.randint(10, 1024)
-    return ''.join(random.choice(letters) for i in range(length))
-
-
 def _bucket_logging_put_objects(versioned):
     src_bucket_name = get_new_bucket()
     if versioned: