from ceph.deployment.drive_group import DriveGroupSpec
ret = {
+ 'mon': MONSpec,
'rgw': RGWSpec,
'nfs': NFSServiceSpec,
'osd': DriveGroupSpec,
yaml.add_representer(MDSSpec, ServiceSpec.yaml_representer)
+class MONSpec(ServiceSpec):
+ def __init__(self,
+ service_type: str,
+ service_id: Optional[str] = None,
+ placement: Optional[PlacementSpec] = None,
+ count: Optional[int] = None,
+ config: Optional[Dict[str, str]] = None,
+ unmanaged: bool = False,
+ preview_only: bool = False,
+ networks: Optional[List[str]] = None,
+ extra_container_args: Optional[List[str]] = None,
+ custom_configs: Optional[List[CustomConfig]] = None,
+ crush_locations: Optional[Dict[str, List[str]]] = None,
+ ):
+ assert service_type == 'mon'
+ super(MONSpec, self).__init__('mon', service_id=service_id,
+ placement=placement,
+ count=count,
+ config=config,
+ unmanaged=unmanaged,
+ preview_only=preview_only,
+ networks=networks,
+ extra_container_args=extra_container_args,
+ custom_configs=custom_configs)
+
+ self.crush_locations = crush_locations
+ self.validate()
+
+ def validate(self) -> None:
+ if self.crush_locations:
+ for host, crush_locs in self.crush_locations.items():
+ try:
+ assert_valid_host(host)
+ except SpecValidationError as e:
+ err_str = f'Invalid hostname found in spec crush locations: {e}'
+ raise SpecValidationError(err_str)
+ for cloc in crush_locs:
+ if '=' not in cloc or len(cloc.split('=')) != 2:
+ err_str = ('Crush locations must be of form <bucket>=<location>. '
+ f'Found crush location: {cloc}')
+ raise SpecValidationError(err_str)
+
+
+yaml.add_representer(MONSpec, ServiceSpec.yaml_representer)
+
+
class TracingSpec(ServiceSpec):
SERVICE_TYPES = ['elasticsearch', 'jaeger-collector', 'jaeger-query', 'jaeger-agent']