class MonService(CephadmService):
TYPE = 'mon'
- def create(self, daemon_spec: CephadmDaemonSpec):
+ def create(self, daemon_spec: CephadmDaemonSpec) -> str:
"""
Create a new monitor on the given host.
"""
class MgrService(CephadmService):
TYPE = 'mgr'
- def create(self, daemon_spec: CephadmDaemonSpec):
+ def create(self, daemon_spec: CephadmDaemonSpec) -> str:
"""
Create a new manager instance on a host.
"""
'value': spec.service_id,
})
- def create(self, daemon_spec: CephadmDaemonSpec):
+ def create(self, daemon_spec: CephadmDaemonSpec) -> str:
mds_id, host = daemon_spec.daemon_id, daemon_spec.host
# get mgr. key
spec.service_name(), spec.placement.pretty_str()))
self.mgr.spec_store.save(spec)
- def create(self, daemon_spec: CephadmDaemonSpec):
+ def create(self, daemon_spec: CephadmDaemonSpec) -> str:
rgw_id, host = daemon_spec.daemon_id, daemon_spec.host
ret, keyring, err = self.mgr.check_mon_command({
'prefix': 'auth get-or-create',
class RbdMirrorService(CephadmService):
TYPE = 'rbd-mirror'
- def create(self, daemon_spec: CephadmDaemonSpec):
+ def create(self, daemon_spec: CephadmDaemonSpec) -> str:
daemon_id, host = daemon_spec.daemon_id, daemon_spec.host
-
ret, keyring, err = self.mgr.check_mon_command({
'prefix': 'auth get-or-create',
'entity': 'client.rbd-mirror.' + daemon_id,
class CrashService(CephadmService):
TYPE = 'crash'
- def create(self, daemon_spec: CephadmDaemonSpec):
+ def create(self, daemon_spec: CephadmDaemonSpec) -> str:
daemon_id, host = daemon_spec.daemon_id, daemon_spec.host
-
ret, keyring, err = self.mgr.check_mon_command({
'prefix': 'auth get-or-create',
'entity': 'client.crash.' + host,
TYPE = 'grafana'
DEFAULT_SERVICE_PORT = 3000
- def create(self, daemon_spec: CephadmDaemonSpec):
+ def create(self, daemon_spec: CephadmDaemonSpec) -> str:
return self.mgr._create_daemon(daemon_spec)
def generate_config(self, daemon_spec: CephadmDaemonSpec) -> Tuple[Dict[str, Any], List[str]]:
TYPE = 'alertmanager'
DEFAULT_SERVICE_PORT = 9093
- def create(self, daemon_spec: CephadmDaemonSpec):
+ def create(self, daemon_spec: CephadmDaemonSpec) -> str:
return self.mgr._create_daemon(daemon_spec)
def generate_config(self, daemon_spec: CephadmDaemonSpec) -> Tuple[Dict[str, Any], List[str]]:
TYPE = 'prometheus'
DEFAULT_SERVICE_PORT = 9095
- def create(self, daemon_spec: CephadmDaemonSpec):
+ def create(self, daemon_spec: CephadmDaemonSpec) -> str:
return self.mgr._create_daemon(daemon_spec)
def generate_config(self, daemon_spec: CephadmDaemonSpec) -> Tuple[Dict[str, Any], List[str]]:
class NodeExporterService(CephadmService):
TYPE = 'node-exporter'
- def create(self, daemon_spec: CephadmDaemonSpec):
+ def create(self, daemon_spec: CephadmDaemonSpec) -> str:
return self.mgr._create_daemon(daemon_spec)
def generate_config(self, daemon_spec: CephadmDaemonSpec) -> Tuple[Dict[str, Any], List[str]]: