FuncT = TypeVar('FuncT', bound=Callable)
+def validate_non_empty_string(value: Optional[str], field_name: str) -> None:
+ if not isinstance(value, str) or not value.strip():
+ raise SpecValidationError(f"Invalid {field_name}: Must be a non-empty string.")
+
+
class TLSBlock(TypedDict, total=False):
ssl: bool
certificate_source: str
def validate(self) -> None:
super(OAuth2ProxySpec, self).validate()
+
required_values = {
'provider_display_name': self.provider_display_name,
'oidc_issuer_url': self.oidc_issuer_url,
+ ', '.join(missing_required_fields)
+ '.'
)
- self._validate_non_empty_string(self.provider_display_name, "provider_display_name")
- self._validate_non_empty_string(self.client_id, "client_id")
- self._validate_non_empty_string(self.client_secret, "client_secret")
+ validate_non_empty_string(self.provider_display_name, "provider_display_name")
+ validate_non_empty_string(self.client_id, "client_id")
+ validate_non_empty_string(self.client_secret, "client_secret")
self._validate_cookie_secret(self.cookie_secret)
self._validate_url(self.oidc_issuer_url, "oidc_issuer_url")
if self.redirect_url is not None:
if self.https_address is not None:
self._validate_https_address(self.https_address)
- def _validate_non_empty_string(self, value: Optional[str], field_name: str) -> None:
- if not value or not isinstance(value, str) or not value.strip():
- raise SpecValidationError(f"Invalid {field_name}: Must be a non-empty string.")
-
def _validate_url(self, url: Optional[str], field_name: str) -> None:
from urllib.parse import urlparse
try:
if 'profile_name' not in spec:
raise SpecValidationError('Tuned profile spec must include "profile_name" field')
data['profile_name'] = spec['profile_name']
- if not isinstance(data['profile_name'], str):
- raise SpecValidationError('"profile_name" field must be a string')
+ validate_non_empty_string(data['profile_name'], "profile_name")
if 'placement' in spec:
data['placement'] = PlacementSpec.from_json(spec['placement'])
if 'settings' in spec:
RGWSpec,
ServiceSpec,
YamlLiteralString,
+ TunedProfileSpec,
)
from ceph.deployment.drive_group import DriveGroupSpec
from ceph.deployment.hostspec import SpecValidationError
assert 'ssl_cert: |' in dumped
assert 'ssl_key: |' in dumped
+
+# Tuned profile spec (e.g. ceph orch tuned-profile apply -i os-tune.spec)
+VALID_TUNED_PROFILE_SPEC = """
+profile_name: os-tune
+placement:
+ hosts:
+ - ceph-node-0
+ - ceph-node-1
+ - ceph-node-2
+"""
+
+EMPTY_PROFILE_NAME_SPEC = """
+profile_name: ''
+placement:
+ hosts:
+ - ceph-node-0
+ - ceph-node-1
+ - ceph-node-2
+"""
+
+MISSING_PROFILE_NAME_SPEC = """
+placement:
+ hosts:
+ - ceph-node-0
+ - ceph-node-1
+ - ceph-node-2
+"""
+
+
+@pytest.mark.parametrize("spec_yaml, expect_error, error_match", [
+ (EMPTY_PROFILE_NAME_SPEC, True, r'Invalid profile_name: Must be a non-empty string\.'),
+ (MISSING_PROFILE_NAME_SPEC, True, r'Tuned profile spec must include "profile_name" field'),
+ (VALID_TUNED_PROFILE_SPEC, False, None),
+])
+def test_tuned_profile_spec_profile_name_validation(spec_yaml, expect_error, error_match):
+ """Test TunedProfileSpec.from_json validation for profile_name (ceph orch tuned-profile apply -i <spec>)."""
+ data = yaml.safe_load(spec_yaml)
+ if expect_error:
+ with pytest.raises(SpecValidationError, match=error_match):
+ TunedProfileSpec.from_json(data)
+ else:
+ spec = TunedProfileSpec.from_json(data)
+ assert spec.profile_name == 'os-tune'
+ assert spec.placement is not None
+ # round-trip
+ assert TunedProfileSpec.from_json(spec.to_json()).profile_name == spec.profile_name