from ceph.deployment.inventory import Device
from ceph.deployment.drive_group import DriveGroupSpec, DeviceSelection
from ceph.deployment.service_spec import PlacementSpec, ServiceSpec, \
- SNMPGatewaySpec, SNMPVersion, SNMPAuthType, SNMPPrivacyType
+ SNMPGatewaySpec
from ceph.deployment.hostspec import SpecValidationError
from ceph.utils import datetime_now
@_cli_write_command('orch apply snmp-gateway')
def _apply_snmp_gateway(self,
- snmp_version: SNMPVersion,
+ snmp_version: SNMPGatewaySpec.SNMPVersion,
destination: str,
port: int = 9464,
engine_id: Optional[str] = None,
- auth_protocol: Optional[SNMPAuthType] = None,
- privacy_protocol: Optional[SNMPPrivacyType] = None,
+ auth_protocol: Optional[SNMPGatewaySpec.SNMPAuthType] = None,
+ privacy_protocol: Optional[SNMPGatewaySpec.SNMPPrivacyType] = None,
placement: Optional[str] = None,
unmanaged: bool = False,
dry_run: bool = False,
except (OSError, yaml.YAMLError):
raise OrchestratorValidationError('credentials file must be valid YAML')
- auth = None if not auth_protocol else auth_protocol.value
- priv = None if not privacy_protocol else privacy_protocol.value
-
spec = SNMPGatewaySpec(
- snmp_version=snmp_version.value,
+ snmp_version=snmp_version,
port=port,
credentials=credentials,
snmp_destination=destination,
engine_id=engine_id,
- auth_protocol=auth,
- privacy_protocol=priv,
+ auth_protocol=auth_protocol,
+ privacy_protocol=privacy_protocol,
placement=PlacementSpec.from_string(placement),
unmanaged=unmanaged,
preview_only=dry_run
ServiceSpecT = TypeVar('ServiceSpecT', bound='ServiceSpec')
FuncT = TypeVar('FuncT', bound=Callable)
-class SNMPVersion(enum.Enum):
- V2c = 'V2c'
- V3 = 'V3'
-
-
-class SNMPAuthType(enum.Enum):
- MD5 = 'MD5'
- SHA = 'SHA'
-
-
-class SNMPPrivacyType(enum.Enum):
- DES = 'DES'
- AES = 'AES'
-
def assert_valid_host(name: str) -> None:
p = re.compile('^[a-zA-Z0-9-]+$')
class SNMPGatewaySpec(ServiceSpec):
+ class SNMPVersion(str, enum.Enum):
+ V2c = 'V2c'
+ V3 = 'V3'
+
+ def to_json(self) -> str:
+ return self.value
+
+ class SNMPAuthType(str, enum.Enum):
+ MD5 = 'MD5'
+ SHA = 'SHA'
+
+ def to_json(self) -> str:
+ return self.value
+
+ class SNMPPrivacyType(str, enum.Enum):
+ DES = 'DES'
+ AES = 'AES'
+
+ def to_json(self) -> str:
+ return self.value
+
valid_destination_types = [
'Name:Port',
'IPv4:Port'
def __init__(self,
service_type: str = 'snmp-gateway',
- snmp_version: Optional[str] = None,
+ snmp_version: Optional[SNMPVersion] = None,
snmp_destination: str = '',
credentials: Dict[str, str] = {},
engine_id: Optional[str] = None,
- auth_protocol: Optional[str] = None,
- privacy_protocol: Optional[str] = None,
+ auth_protocol: Optional[SNMPAuthType] = None,
+ privacy_protocol: Optional[SNMPPrivacyType] = None,
placement: Optional[PlacementSpec] = None,
unmanaged: bool = False,
preview_only: bool = False,
self.auth_protocol = auth_protocol
self.privacy_protocol = privacy_protocol
+ @classmethod
+ def _from_json_impl(cls, json_spec: dict) -> 'SNMPGatewaySpec':
+
+ cpy = json_spec.copy()
+ types = [
+ ('snmp_version', SNMPGatewaySpec.SNMPVersion),
+ ('auth_protocol', SNMPGatewaySpec.SNMPAuthType),
+ ('privacy_protocol', SNMPGatewaySpec.SNMPPrivacyType),
+ ]
+ for d in cpy, cpy.get('spec', {}):
+ for key, enum_cls in types:
+ try:
+ if key in d:
+ d[key] = enum_cls(d[key])
+ except ValueError:
+ raise SpecValidationError(f'{key} unsupported. Must be one of '
+ f'{", ".join(enum_cls)}')
+ return super(SNMPGatewaySpec, cls)._from_json_impl(cpy)
+
@property
def ports(self) -> List[int]:
return [self.port]
def validate(self) -> None:
super(SNMPGatewaySpec, self).validate()
- def _check_type(name: str, value: Optional[str], options: List[str]) -> None:
- if not value:
- return
- if value not in options:
- raise SpecValidationError(
- f'{name} unsupported. Must be one of {", ".join(sorted(options))}'
- )
-
if not self.credentials:
raise SpecValidationError(
'Missing authentication information (credentials). '
'Missing SNMP version (snmp_version)'
)
- _check_type('snmp_version',
- self.snmp_version,
- list(set(opt.value for opt in SNMPVersion)))
- _check_type('auth_protocol',
- self.auth_protocol,
- list(set(opt.value for opt in SNMPAuthType)))
- _check_type('privacy_protocol',
- self.privacy_protocol,
- list(set(opt.value for opt in SNMPPrivacyType)))
-
creds_requirement = {
'V2c': ['snmp_community'],
'V3': ['snmp_v3_auth_username', 'snmp_v3_auth_password']