'rgw': self.rgw_service.config,
'nfs': self.nfs_service.config,
'iscsi': self.iscsi_service.config,
- }.get(service_type)
+ }.get(service_type) # type: ignore
def _apply_service(self, spec: ServiceSpec) -> bool:
"""
class MdsService(CephadmService):
TYPE = 'mds'
- def config(self, spec: ServiceSpec):
+ def config(self, spec: ServiceSpec) -> None:
# ensure mds_join_fs is set for these daemons
assert spec.service_id
ret, out, err = self.mgr.check_mon_command({
class RgwService(CephadmService):
TYPE = 'rgw'
- def config(self, spec: RGWSpec):
+ def config(self, spec: RGWSpec) -> None:
# ensure rgw_realm and rgw_zone is set for these daemons
ret, out, err = self.mgr.check_mon_command({
'prefix': 'config set',
class IscsiService(CephadmService):
TYPE = 'iscsi'
- def config(self, spec: IscsiServiceSpec):
+ def config(self, spec: IscsiServiceSpec) -> None:
self.mgr._check_pool_exists(spec.pool, spec.service_name())
logger.info('Saving service %s spec with placement %s' % (
return cephadm_config, deps
- def config(self, spec):
+ def config(self, spec: NFSServiceSpec) -> None:
self.mgr._check_pool_exists(spec.pool, spec.service_name())
logger.info('Saving service %s spec with placement %s' % (
spec.service_name(), spec.placement.pretty_str()))