From 7ffa9fa84ac0958b2cdad72e7d164edf257107a4 Mon Sep 17 00:00:00 2001 From: Yuval Lifshitz Date: Wed, 10 Sep 2025 09:48:48 +0000 Subject: [PATCH] rgw/logging: add conf change specific tests and also test updates while writing logs fix: cover all src buckets in cleanup tests Signed-off-by: Yuval Lifshitz --- s3tests_boto3/functional/test_s3.py | 206 +++++++++++++++++++++++++++- 1 file changed, 202 insertions(+), 4 deletions(-) diff --git a/s3tests_boto3/functional/test_s3.py b/s3tests_boto3/functional/test_s3.py index 5a5f2f77..4b4a8cfb 100644 --- a/s3tests_boto3/functional/test_s3.py +++ b/s3tests_boto3/functional/test_s3.py @@ -14988,7 +14988,7 @@ def _verify_records(records, bucket_name, event_type, src_keys, record_type, exp all_keys = [] for record in iter(records.splitlines()): parsed_record = _parse_log_record(record, record_type) - logger.info('bucket log record: %s', json.dumps(parsed_record, indent=4)) + logger.debug('bucket log record: %s', json.dumps(parsed_record, indent=4)) if bucket_name in record and event_type in record: all_keys.append(parsed_record['Key']) for key in src_keys: @@ -15001,6 +15001,7 @@ def _verify_records(records, bucket_name, event_type, src_keys, record_type, exp return len(keys_found) == expected_count and len(keys_found) == len(all_keys) return len(keys_found) == expected_count + def _verify_record_field(records, bucket_name, event_type, object_key, record_type, field_name, expected_value): for record in iter(records.splitlines()): if bucket_name in record and event_type in record and object_key in record: @@ -15013,6 +15014,7 @@ def _verify_record_field(records, bucket_name, event_type, object_key, record_ty return False return False + def randcontent(): letters = string.ascii_lowercase length = random.randint(10, 1024) @@ -17578,6 +17580,7 @@ def test_bucket_logging_object_meta(): client.put_object_legal_hold(Bucket=src_bucket_name, Key=name, LegalHold={'Status': 'OFF'}) client.delete_object(Bucket=src_bucket_name, Key=name, VersionId=version_id, BypassGovernanceRetention=True) + def _verify_flushed_on_put(result): if _has_bucket_logging_extension(): assert result['ResponseMetadata']['HTTPStatusCode'] == 200 @@ -17676,7 +17679,6 @@ def _bucket_logging_cleanup(cleanup_type, logging_type, single_prefix, concurren elif cleanup_type != 'target': assert False, 'invalid cleanup type: ' + cleanup_type - exact_match = False _do_wait_completion(t) if cleanup_type == 'target': @@ -17739,8 +17741,10 @@ def _bucket_logging_cleanup(cleanup_type, logging_type, single_prefix, concurren found = True assert found, 'log key does not match any expected prefix: ' + key + ' expected prefixes: ' + str(prefixes) - assert _verify_records(body, src_bucket_name, 'REST.PUT.OBJECT', src_names, logging_type, num_keys, exact_match) - assert _verify_records(body, src_bucket_name, 'REST.DELETE.OBJECT', src_names, logging_type, num_keys, exact_match) + exact_match = True + for src_bucket_name in buckets: + assert _verify_records(body, src_bucket_name, 'REST.PUT.OBJECT', src_names, logging_type, num_keys, exact_match) + assert _verify_records(body, src_bucket_name, 'REST.DELETE.OBJECT', src_names, logging_type, num_keys, exact_match) @pytest.mark.bucket_logging @@ -17967,6 +17971,200 @@ def test_bucket_logging_cleanup_concurrent_updating_s_single(): _bucket_logging_cleanup('updating', 'Standard', True, True) +def _bucket_logging_conf_update(logging_type, update_value, concurrency): + if not _has_bucket_logging_extension(): + pytest.skip('ceph extension to bucket logging not supported at client') + log_bucket_name = get_new_bucket_name() + log_bucket = get_new_bucket_resource(name=log_bucket_name) + client = get_client() + + prefix = 'log/' + longer_time = expected_object_roll_time*10 + src_bucket_name = get_new_bucket_name() + src_bucket = get_new_bucket_resource(name=src_bucket_name) + + _set_log_bucket_policy(client, log_bucket_name, [src_bucket_name], [prefix]) + + logging_enabled = {'TargetBucket': log_bucket_name, + 'ObjectRollTime': longer_time, + 'LoggingType': logging_type, + 'TargetPrefix': prefix} + response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={ + 'LoggingEnabled': logging_enabled, + }) + assert response['ResponseMetadata']['HTTPStatusCode'] == 200 + + + # conf update without any log records + if update_value == "roll_time": + logging_enabled['ObjectRollTime'] = longer_time*2 + elif update_value == "prefix": + prefix = 'newlog1/' + _set_log_bucket_policy(client, log_bucket_name, [src_bucket_name], [prefix]) + logging_enabled['TargetPrefix'] = prefix + else: + assert False, 'invalid update value: ' + update_value + + result = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={ + 'LoggingEnabled': logging_enabled, + }) + assert response['ResponseMetadata']['HTTPStatusCode'] == 200 + + num_keys = 100 + src_names = [] + for j in range(num_keys): + src_names.append('myobject'+str(j)) + + def put_object_with_exception_handling(bucket, key, body): + try: + client.put_object(Bucket=bucket, Key=key, Body=body) + except Exception as e: + logger.warning(f'put_object failed for {key}: {e}') + + t = [] + for name in src_names: + if concurrency: + thr = threading.Thread(target=put_object_with_exception_handling, + args=(src_bucket_name, name, randcontent())) + thr.start() + t.append(thr) + else: + try: + client.put_object(Bucket=src_bucket_name, Key=name, Body=randcontent()) + except Exception as e: + logger.warning(f'put_object failed for {name}: {e}') + + if update_value == "roll_time": + logging_enabled['ObjectRollTime'] = longer_time*3 + elif update_value == "prefix": + prefix = 'newlog2/' + _set_log_bucket_policy(client, log_bucket_name, [src_bucket_name], [prefix]) + logging_enabled['TargetPrefix'] = prefix + else: + assert False, 'invalid update value: ' + update_value + result = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={ + 'LoggingEnabled': logging_enabled, + }) + + flushed_obj = _verify_flushed_on_put(result) + logger.info('flushed log object after conf update: %s', flushed_obj) + + # perform conf update while puts are ongoing + _do_wait_completion(t) + + expected_count = num_keys + expected_log_objs = 2 + if concurrency: + # some records may be written after the conf change + _flush_logs(client, src_bucket_name) + expected_log_objs = 3 + if update_value == 'prefix' and logging_type == 'Journal': + # when changing prefix in concurrency mode + # some put operations may fail, so the list of source objects + # needs to be taken fro mthe bucket + response = client.list_objects_v2(Bucket=src_bucket_name) + src_names = _get_keys(response) + expected_count = len(src_names) + + response = client.list_objects_v2(Bucket=log_bucket_name) + keys = _get_keys(response) + # one empty log object due to the initial conf change + # and one with the log records due to the 2nd conf change + # and one flushed object at the end + assert len(keys) == expected_log_objs + assert flushed_obj in keys + + exact_match = True + body = "" + for key in keys: + logger.info('processing log object: %s', key) + response = client.get_object(Bucket=log_bucket_name, Key=key) + body += _get_body(response) + # delete the key that we already processed + client.delete_object(Bucket=log_bucket_name, Key=key) + + ok = _verify_records(body, src_bucket_name, 'REST.PUT.OBJECT', src_names, logging_type, expected_count, exact_match) + if not (concurrency and update_value == 'prefix' and logging_type == 'Standard'): + # we can have silent failures when changing the prefix in concurrency mode with standard logging + assert ok + + # make sure that logging still works after conf change + src_names = [] + for j in range(num_keys): + src_names.append('anotherobject'+str(j)) + for name in src_names: + client.put_object(Bucket=src_bucket_name, Key=name, Body=randcontent()) + + flushed_obj = _flush_logs(client, src_bucket_name) + response = client.list_objects_v2(Bucket=log_bucket_name) + keys = _get_keys(response) + assert len(keys) == 1 + assert flushed_obj in keys + + body = "" + for key in keys: + response = client.get_object(Bucket=log_bucket_name, Key=key) + body += _get_body(response) + + assert _verify_records(body, src_bucket_name, 'REST.PUT.OBJECT', src_names, logging_type, num_keys, exact_match) + + +@pytest.mark.bucket_logging +@pytest.mark.bucket_logging_cleanup +@pytest.mark.fails_on_aws +def test_bucket_logging_conf_updating_roll_s(): + _bucket_logging_conf_update('Standard', 'roll_time', False) + + +@pytest.mark.bucket_logging +@pytest.mark.bucket_logging_cleanup +@pytest.mark.fails_on_aws +def test_bucket_logging_conf_updating_roll_j(): + _bucket_logging_conf_update('Journal', 'roll_time', False) + + +@pytest.mark.bucket_logging +@pytest.mark.bucket_logging_cleanup +@pytest.mark.fails_on_aws +def test_bucket_logging_conf_updating_pfx_s(): + _bucket_logging_conf_update('Standard', 'prefix', False) + + +@pytest.mark.bucket_logging +@pytest.mark.bucket_logging_cleanup +@pytest.mark.fails_on_aws +def test_bucket_logging_conf_updating_pfx_j(): + _bucket_logging_conf_update('Journal', 'prefix', False) + + +@pytest.mark.bucket_logging +@pytest.mark.bucket_logging_cleanup +@pytest.mark.fails_on_aws +def test_bucket_logging_conf_concurrent_updating_roll_s(): + _bucket_logging_conf_update('Standard', 'roll_time', True) + + +@pytest.mark.bucket_logging +@pytest.mark.bucket_logging_cleanup +@pytest.mark.fails_on_aws +def test_bucket_logging_conf_concurrent_updating_roll_j(): + _bucket_logging_conf_update('Journal', 'roll_time', True) + + +@pytest.mark.bucket_logging +@pytest.mark.bucket_logging_cleanup +@pytest.mark.fails_on_aws +def test_bucket_logging_conf_concurrent_updating_pfx_s(): + _bucket_logging_conf_update('Standard', 'prefix', True) + + +@pytest.mark.bucket_logging +@pytest.mark.bucket_logging_cleanup +@pytest.mark.fails_on_aws +def test_bucket_logging_conf_concurrent_updating_pfx_j(): + _bucket_logging_conf_update('Journal', 'prefix', True) + + def check_parts_count(parts, expected): # AWS docs disagree on the name of this element if 'TotalPartsCount' in parts: -- 2.39.5