from ceph.deployment.drive_group import DriveGroupSpec
from ceph.deployment.service_spec import \
NFSServiceSpec, ServiceSpec, PlacementSpec, assert_valid_host, \
- CustomContainerSpec
+ CustomContainerSpec, HostPlacementSpec
from cephadm.serve import CephadmServe
from cephadm.services.cephadmservice import CephadmDaemonSpec
# (https://github.com/alfredodeza/remoto/pull/56) lands
from distutils.version import StrictVersion
if StrictVersion(remoto.__version__) <= StrictVersion('1.2'):
- def remoto_has_connection(self):
+ def remoto_has_connection(self: Any) -> bool:
return self.gateway.hasreceiver()
from remoto.backends import BaseConnection
class CephadmCompletion(orchestrator.Completion[T]):
- def evaluate(self):
+ def evaluate(self) -> None:
self.finalize(None)
"""
@wraps(f)
- def wrapper(*args, **kwargs):
+ def wrapper(*args: Any, **kwargs: Any) -> CephadmCompletion:
return CephadmCompletion(on_complete=lambda _: f(*args, **kwargs))
return wrapper
-def service_inactive(spec_name: str):
- def inner(func):
+def service_inactive(spec_name: str) -> Callable:
+ def inner(func: Callable) -> Callable:
@wraps(func)
- def wrapper(*args, **kwargs):
+ def wrapper(*args: Any, **kwargs: Any) -> Any:
obj = args[0]
if obj.get_store(f"spec.{spec_name}") is not None:
return 1, "", f"Unable to change configuration of an active service {spec_name}"
}
]
- def __init__(self, *args, **kwargs):
+ def __init__(self, *args: Any, **kwargs: Any):
super(CephadmOrchestrator, self).__init__(*args, **kwargs)
self._cluster_fsid = self.get('mon_map')['fsid']
self.last_monmap: Optional[datetime.datetime] = None
self.upgrade = CephadmUpgrade(self)
- self.health_checks = {}
+ self.health_checks: Dict[str, dict] = {}
self.all_progress_references = list() # type: List[orchestrator.ProgressReference]
self.template = TemplateMgr(self)
- self.requires_post_actions = set()
+ self.requires_post_actions: Set[str] = set()
- def shutdown(self):
+ def shutdown(self) -> None:
self.log.debug('shutdown')
self._worker_pool.close()
self._worker_pool.join()
assert service_type in ServiceSpec.KNOWN_SERVICE_TYPES
return self.cephadm_services[service_type]
- def _kick_serve_loop(self):
+ def _kick_serve_loop(self) -> None:
self.log.debug('_kick_serve_loop')
self.event.set()
# function responsible for logging single host into custom registry
- def _registry_login(self, host, url, username, password):
+ def _registry_login(self, host: str, url: Optional[str], username: Optional[str], password: Optional[str]) -> Optional[str]:
self.log.debug(f"Attempting to log host {host} into custom registry @ {url}")
# want to pass info over stdin rather than through normal list of args
- args_str = ("{\"url\": \"" + url + "\", \"username\": \"" + username + "\", "
- " \"password\": \"" + password + "\"}")
+ args_str = json.dumps({
+ 'url': url,
+ 'username': username,
+ 'password': password,
+ })
out, err, code = self._run_cephadm(
host, 'mon', 'registry-login',
['--registry-json', '-'], stdin=args_str, error_ok=True)
if code:
return f"Host {host} failed to login to {url} as {username} with given password"
- return
+ return None
def serve(self) -> None:
"""
serve = CephadmServe(self)
serve.serve()
- def set_container_image(self, entity: str, image):
+ def set_container_image(self, entity: str, image: str) -> None:
self.check_mon_command({
'prefix': 'config set',
'name': 'container_image',
'who': entity,
})
- def config_notify(self):
+ def config_notify(self) -> None:
"""
This method is called whenever one of our config options is changed.
self.event.set()
- def notify(self, notify_type, notify_id):
+ def notify(self, notify_type: str, notify_id: Optional[str]) -> None:
if notify_type == "mon_map":
# get monmap mtime so we can refresh configs when mons change
monmap = self.get('mon_map')
if notify_type == "pg_summary":
self._trigger_osd_removal()
- def _trigger_osd_removal(self):
+ def _trigger_osd_removal(self) -> None:
data = self.get("osd_stats")
for osd in data.get('osd_stats', []):
if osd.get('num_pgs') == 0:
# start the process
self.rm_util.process_removal_queue()
- def pause(self):
+ def pause(self) -> None:
if not self.paused:
self.log.info('Paused')
self.set_store('pause', 'true')
# wake loop so we update the health status
self._kick_serve_loop()
- def resume(self):
+ def resume(self) -> None:
if self.paused:
self.log.info('Resumed')
self.paused = False
continue
return name
- def _reconfig_ssh(self):
+ def _reconfig_ssh(self) -> None:
temp_files = [] # type: list
ssh_options = [] # type: List[str]
self._reset_cons()
- def validate_ssh_config_content(self, ssh_config):
+ def validate_ssh_config_content(self, ssh_config: Optional[str]) -> None:
if ssh_config is None or len(ssh_config.strip()) == 0:
raise OrchestratorValidationError('ssh_config cannot be empty')
# StrictHostKeyChecking is [yes|no] ?
if 'ask' in s.lower():
raise OrchestratorValidationError(f'ssh_config cannot contain: \'{s}\'')
- def validate_ssh_config_fname(self, ssh_config_fname):
+ def validate_ssh_config_fname(self, ssh_config_fname: str) -> None:
if not os.path.isfile(ssh_config_fname):
raise OrchestratorValidationError("ssh_config \"{}\" does not exist".format(
ssh_config_fname))
- def _reset_con(self, host):
+ def _reset_con(self, host: str) -> None:
conn, r = self._cons.get(host, (None, None))
if conn:
self.log.debug('_reset_con close %s' % host)
conn.exit()
del self._cons[host]
- def _reset_cons(self):
+ def _reset_cons(self) -> None:
for host, conn_and_r in self._cons.items():
self.log.debug('_reset_cons close %s' % host)
conn, r = conn_and_r
conn.exit()
self._cons = {}
- def offline_hosts_remove(self, host):
+ def offline_hosts_remove(self, host: str) -> None:
if host in self.offline_hosts:
self.offline_hosts.remove(host)
@staticmethod
- def can_run():
+ def can_run() -> Tuple[bool, str]:
if remoto is not None:
return True, ""
else:
return False, "loading remoto library:{}".format(
remoto_import_error)
- def available(self):
+ def available(self) -> Tuple[bool, str]:
"""
The cephadm orchestrator is always available.
"""
return False, 'SSH keys not set. Use `ceph cephadm set-priv-key` and `ceph cephadm set-pub-key` or `ceph cephadm generate-key`'
return True, ''
- def process(self, completions):
+ def process(self, completions: List[CephadmCompletion]) -> None:
"""
Does nothing, as completions are processed in another thread.
"""
@orchestrator._cli_write_command(
prefix='cephadm set-ssh-config',
desc='Set the ssh_config file (use -i <ssh_config>)')
- def _set_ssh_config(self, inbuf=None):
+ def _set_ssh_config(self, inbuf: Optional[str] = None) -> Tuple[int, str, str]:
"""
Set an ssh_config file provided from stdin
"""
@orchestrator._cli_write_command(
prefix='cephadm clear-ssh-config',
desc='Clear the ssh_config file')
- def _clear_ssh_config(self):
+ def _clear_ssh_config(self) -> Tuple[int, str, str]:
"""
Clear the ssh_config file provided from stdin
"""
prefix='cephadm get-ssh-config',
desc='Returns the ssh config as used by cephadm'
)
- def _get_ssh_config(self):
+ def _get_ssh_config(self) -> HandleCommandResult:
if self.ssh_config_file:
self.validate_ssh_config_fname(self.ssh_config_file)
with open(self.ssh_config_file) as f:
@orchestrator._cli_write_command(
'cephadm generate-key',
desc='Generate a cluster SSH key (if not present)')
- def _generate_key(self):
+ def _generate_key(self) -> Tuple[int, str, str]:
if not self.ssh_pub or not self.ssh_key:
self.log.info('Generating ssh key...')
tmp_dir = TemporaryDirectory()
@orchestrator._cli_write_command(
'cephadm set-priv-key',
desc='Set cluster SSH private key (use -i <private_key>)')
- def _set_priv_key(self, inbuf=None):
+ def _set_priv_key(self, inbuf: Optional[str] = None) -> Tuple[int, str, str]:
if inbuf is None or len(inbuf) == 0:
return -errno.EINVAL, "", "empty private ssh key provided"
if inbuf == self.ssh_key:
@orchestrator._cli_write_command(
'cephadm set-pub-key',
desc='Set cluster SSH public key (use -i <public_key>)')
- def _set_pub_key(self, inbuf=None):
+ def _set_pub_key(self, inbuf: Optional[str] = None) -> Tuple[int, str, str]:
if inbuf is None or len(inbuf) == 0:
return -errno.EINVAL, "", "empty public ssh key provided"
if inbuf == self.ssh_pub:
@orchestrator._cli_write_command(
'cephadm clear-key',
desc='Clear cluster SSH key')
- def _clear_key(self):
+ def _clear_key(self) -> Tuple[int, str, str]:
self.set_store('ssh_identity_key', None)
self.set_store('ssh_identity_pub', None)
self._reconfig_ssh()
@orchestrator._cli_read_command(
'cephadm get-pub-key',
desc='Show SSH public key for connecting to cluster hosts')
- def _get_pub_key(self):
+ def _get_pub_key(self) -> Tuple[int, str, str]:
if self.ssh_pub:
return 0, self.ssh_pub, ''
else:
@orchestrator._cli_read_command(
'cephadm get-user',
desc='Show user for SSHing to cluster hosts')
- def _get_user(self):
+ def _get_user(self) -> Tuple[int, str, str]:
return 0, self.ssh_user, ''
@orchestrator._cli_read_command(
'cephadm set-user',
'name=user,type=CephString',
'Set user for SSHing to cluster hosts, passwordless sudo will be needed for non-root users')
- def set_ssh_user(self, user):
+ def set_ssh_user(self, user: str) -> Tuple[int, str, str]:
current_user = self.ssh_user
if user == current_user:
return 0, "value unchanged", ""
"name=username,type=CephString,req=false "
"name=password,type=CephString,req=false",
'Set custom registry login info by providing url, username and password or json file with login info (-i <file>)')
- def registry_login(self, url=None, username=None, password=None, inbuf=None):
+ def registry_login(self, url: Optional[str] = None, username: Optional[str] = None, password: Optional[str] = None, inbuf: Optional[str] = None) -> Tuple[int, str, str]:
# if password not given in command line, get it through file input
if not (url and username and password) and (inbuf is None or len(inbuf) == 0):
return -errno.EINVAL, "", ("Invalid arguments. Please provide arguments <url> <username> <password> "
"or -i <login credentials json file>")
elif not (url and username and password):
+ assert isinstance(inbuf, str)
login_info = json.loads(inbuf)
if "url" in login_info and "username" in login_info and "password" in login_info:
url = login_info["url"]
'name=host,type=CephString '
'name=addr,type=CephString,req=false',
'Check whether we can access and manage a remote host')
- def check_host(self, host, addr=None):
+ def check_host(self, host: str, addr: Optional[str] = None) -> Tuple[int, str, str]:
try:
out, err, code = self._run_cephadm(host, cephadmNoImage, 'check-host',
['--expect-hostname', host],
for item in self.health_checks['CEPHADM_HOST_CHECK_FAILED']['detail']:
if item.startswith('host %s ' % host):
self.event.set()
- return 0, '%s (%s) ok' % (host, addr), err
+ return 0, '%s (%s) ok' % (host, addr), '\n'.join(err)
@orchestrator._cli_read_command(
'cephadm prepare-host',
'name=host,type=CephString '
'name=addr,type=CephString,req=false',
'Prepare a remote host for use with cephadm')
- def _prepare_host(self, host, addr=None):
+ def _prepare_host(self, host: str, addr: Optional[str] = None) -> Tuple[int, str, str]:
out, err, code = self._run_cephadm(host, cephadmNoImage, 'prepare-host',
['--expect-hostname', host],
addr=addr,
for item in self.health_checks['CEPHADM_HOST_CHECK_FAILED']['detail']:
if item.startswith('host %s ' % host):
self.event.set()
- return 0, '%s (%s) ok' % (host, addr), err
+ return 0, '%s (%s) ok' % (host, addr), '\n'.join(err)
@orchestrator._cli_write_command(
prefix='cephadm set-extra-ceph-conf',
"Mainly a workaround, till `config generate-minimal-conf` generates\n"
"a complete ceph.conf.\n\n"
"Warning: this is a dangerous operation.")
- def _set_extra_ceph_conf(self, inbuf=None) -> HandleCommandResult:
+ def _set_extra_ceph_conf(self, inbuf: Optional[str] = None) -> HandleCommandResult:
if inbuf:
# sanity check.
cp = ConfigParser()
prefix='cephadm generate-exporter-config',
desc='Generate default SSL crt/key and token for cephadm exporter daemons')
@service_inactive('cephadm-exporter')
- def _generate_exporter_config(self):
+ def _generate_exporter_config(self) -> Tuple[int, str, str]:
self._set_exporter_defaults()
self.log.info('Default settings created for cephadm exporter(s)')
return 0, "", ""
- def _set_exporter_defaults(self):
+ def _set_exporter_defaults(self) -> None:
crt, key = self._generate_exporter_ssl()
token = self._generate_exporter_token()
self._set_exporter_config({
})
self._set_exporter_option('enabled', 'true')
- def _generate_exporter_ssl(self):
+ def _generate_exporter_ssl(self) -> Tuple[str, str]:
return create_self_signed_cert(dname={"O": "Ceph", "OU": "cephadm-exporter"})
- def _generate_exporter_token(self):
+ def _generate_exporter_token(self) -> str:
return secrets.token_hex(32)
@orchestrator._cli_write_command(
prefix='cephadm clear-exporter-config',
desc='Clear the SSL configuration used by cephadm exporter daemons')
@service_inactive('cephadm-exporter')
- def _clear_exporter_config(self):
+ def _clear_exporter_config(self) -> Tuple[int, str, str]:
self._clear_exporter_config_settings()
self.log.info('Cleared cephadm exporter configuration')
return 0, "", ""
prefix='cephadm set-exporter-config',
desc='Set custom cephadm-exporter configuration from a json file (-i <file>). JSON must contain crt, key, token and port')
@service_inactive('cephadm-exporter')
- def _store_exporter_config(self, inbuf=None):
+ def _store_exporter_config(self, inbuf: Optional[str] = None) -> Tuple[int, str, str]:
if not inbuf:
return 1, "", "JSON configuration has not been provided (-i <filename>)"
@orchestrator._cli_read_command(
'cephadm get-exporter-config',
desc='Show the current cephadm-exporter configuraion (JSON)')
- def _show_exporter_config(self):
+ def _show_exporter_config(self) -> Tuple[int, str, str]:
cfg = self._get_exporter_config()
return 0, json.dumps(cfg, indent=2), ""
return False
return conf.last_modified > dt
- def _get_connection(self, host: str):
+ def _get_connection(self, host: str) -> Tuple['remoto.backends.BaseConnection',
+ 'remoto.backends.LegacyModuleExecute']:
"""
Setup a connection for running commands on remote host.
"""
return conn, r
- def _executable_path(self, conn, executable):
+ def _executable_path(self, conn: 'remoto.backends.BaseConnection', executable: str) -> str:
"""
Remote validator that accepts a connection object to ensure that a certain
executable is available returning its full path if so.
return "Removed host '{}'".format(host)
@trivial_completion
- def update_host_addr(self, host, addr) -> str:
+ def update_host_addr(self, host: str, addr: str) -> str:
self.inventory.set_addr(host, addr)
self._reset_con(host)
self.event.set() # refresh stray health check
return list(self.inventory.all_specs())
@trivial_completion
- def add_host_label(self, host, label) -> str:
+ def add_host_label(self, host: str, label: str) -> str:
self.inventory.add_label(host, label)
self.log.info('Added label %s to host %s' % (label, host))
return 'Added label %s to host %s' % (label, host)
@trivial_completion
- def remove_host_label(self, host, label) -> str:
+ def remove_host_label(self, host: str, label: str) -> str:
self.inventory.rm_label(host, label)
self.log.info('Removed label %s to host %s' % (label, host))
return 'Removed label %s from host %s' % (label, host)
@trivial_completion
- def host_ok_to_stop(self, hostname: str):
+ def host_ok_to_stop(self, hostname: str) -> str:
if hostname not in self.cache.get_hosts():
raise OrchestratorError(f'Cannot find host "{hostname}"')
config += '\n\n' + extra.strip() + '\n'
return config
- def _invalidate_daemons_and_kick_serve(self, filter_host=None):
+ def _invalidate_daemons_and_kick_serve(self, filter_host: Optional[str] = None) -> None:
if filter_host:
self.cache.invalidate_host_daemons(filter_host)
else:
return result
@trivial_completion
- def service_action(self, action, service_name) -> List[str]:
+ def service_action(self, action: str, service_name: str) -> List[str]:
args = []
for host, dm in self.cache.daemons.items():
for name, d in dm.items():
return self._daemon_actions(args)
@forall_hosts
- def _daemon_actions(self, daemon_type, daemon_id, host, action) -> str:
+ def _daemon_actions(self, daemon_type: str, daemon_id: str, host: str, action: str) -> str:
with set_exception_subject('daemon', DaemonDescription(
daemon_type=daemon_type,
daemon_id=daemon_id
).name()):
return self._daemon_action(daemon_type, daemon_id, host, action)
- def _daemon_action(self, daemon_type, daemon_id, host, action, image=None) -> str:
+ def _daemon_action(self, daemon_type: str, daemon_id: str, host: str, action: str, image: Optional[str] = None) -> str:
daemon_spec: CephadmDaemonSpec = CephadmDaemonSpec(
host=host,
daemon_id=daemon_id,
self.events.for_daemon(name, 'INFO', msg)
return msg
- def _daemon_action_set_image(self, action: str, image: Optional[str], daemon_type: str, daemon_id: str):
+ def _daemon_action_set_image(self, action: str, image: Optional[str], daemon_type: str, daemon_id: str) -> None:
if image is not None:
if action != 'redeploy':
raise OrchestratorError(
def daemon_is_self(self, daemon_type: str, daemon_id: str) -> bool:
return daemon_type == 'mgr' and daemon_id == self.get_mgr_id()
- def _schedule_daemon_action(self, daemon_name: str, action: str):
+ def _schedule_daemon_action(self, daemon_name: str, action: str) -> str:
dd = self.cache.get_daemon(daemon_name)
if action == 'redeploy' and self.daemon_is_self(dd.daemon_type, dd.daemon_id) \
and not self.mgr_service.mgr_map_has_standby():
return self._remove_daemons(args)
@trivial_completion
- def remove_service(self, service_name) -> str:
+ def remove_service(self, service_name: str) -> str:
self.log.info('Remove service %s' % service_name)
self._trigger_preview_refresh(service_name=service_name)
found = self.spec_store.rm(service_name)
return f'Failed to remove service. <{service_name}> was not found.'
@trivial_completion
- def get_inventory(self, host_filter: Optional[orchestrator.InventoryFilter] = None, refresh=False) -> List[orchestrator.InventoryHost]:
+ def get_inventory(self, host_filter: Optional[orchestrator.InventoryFilter] = None, refresh: bool = False) -> List[orchestrator.InventoryHost]:
"""
Return the storage inventory of hosts matching the given filter.
return result
@trivial_completion
- def zap_device(self, host, path) -> str:
+ def zap_device(self, host: str, path: str) -> str:
self.log.info('Zap device %s:%s' % (host, path))
out, err, code = self._run_cephadm(
host, 'osd', 'ceph-volume',
See templates/blink_device_light_cmd.j2
"""
@forall_hosts
- def blink(host, dev, path):
+ def blink(host: str, dev: str, path: str) -> str:
cmd_line = self.template.render('blink_device_light_cmd.j2',
{
'on': on,
def _preview_osdspecs(self,
osdspecs: Optional[List[DriveGroupSpec]] = None
- ):
+ ) -> dict:
if not osdspecs:
return {'n/a': [{'error': True,
'message': 'No OSDSpec or matching hosts found.'}]}
previews_for_specs.update({host: osd_reports})
return previews_for_specs
- def _calc_daemon_deps(self, daemon_type, daemon_id):
+ def _calc_daemon_deps(self, daemon_type: str, daemon_id: str) -> List[str]:
need = {
'prometheus': ['mgr', 'alertmanager', 'node-exporter'],
'grafana': ['prometheus'],
def _create_daemon(self,
daemon_spec: CephadmDaemonSpec,
- reconfig=False,
+ reconfig: bool = False,
osd_uuid_map: Optional[Dict[str, Any]] = None,
) -> str:
return code == 0
@forall_hosts
- def _remove_daemons(self, name, host) -> str:
+ def _remove_daemons(self, name: str, host: str) -> str:
return self._remove_daemon(name, host)
- def _remove_daemon(self, name, host) -> str:
+ def _remove_daemon(self, name: str, host: str) -> str:
"""
Remove a daemon
"""
return "Removed {} from host '{}'".format(name, host)
- def _check_pool_exists(self, pool, service_name):
+ def _check_pool_exists(self, pool: str, service_name: str) -> None:
logger.info(f'Checking pool "{pool}" exists for service {service_name}')
if not self.rados.pool_exists(pool):
raise OrchestratorError(f'Cannot find pool "{pool}" for '
f'service {service_name}')
- def _add_daemon(self, daemon_type, spec,
- create_func: Callable[..., CephadmDaemonSpec], config_func=None) -> List[str]:
+ def _add_daemon(self,
+ daemon_type: str,
+ spec: ServiceSpec,
+ create_func: Callable[..., CephadmDaemonSpec],
+ config_func: Optional[Callable] = None) -> List[str]:
"""
Add (and place) a daemon. Require explicit host placement. Do not
schedule, and do not apply the related scheduling limitations.
spec.placement.hosts, count,
create_func, config_func)
- def _create_daemons(self, daemon_type, spec, daemons,
- hosts, count,
- create_func: Callable[..., CephadmDaemonSpec], config_func=None) -> List[str]:
+ def _create_daemons(self,
+ daemon_type: str,
+ spec: ServiceSpec,
+ daemons: List[DaemonDescription],
+ hosts: List[HostPlacementSpec],
+ count: int,
+ create_func: Callable[..., CephadmDaemonSpec],
+ config_func: Optional[Callable] = None) -> List[str]:
if count > len(hosts):
raise OrchestratorError('too few hosts: want %d, have %s' % (
count, hosts))
daemons.append(sd)
@forall_hosts
- def create_func_map(*args):
+ def create_func_map(*args: Any) -> str:
daemon_spec = create_func(*args)
return self._create_daemon(daemon_spec)
return create_func_map(args)
@trivial_completion
- def apply_mon(self, spec) -> str:
+ def apply_mon(self, spec: ServiceSpec) -> str:
return self._apply(spec)
@trivial_completion
return self._apply_service_spec(cast(ServiceSpec, spec))
- def _plan(self, spec: ServiceSpec):
+ def _plan(self, spec: ServiceSpec) -> dict:
if spec.service_type == 'osd':
return {'service_name': spec.service_name(),
'service_type': spec.service_type,
return results
@trivial_completion
- def apply_mgr(self, spec) -> str:
+ def apply_mgr(self, spec: ServiceSpec) -> str:
return self._apply(spec)
@trivial_completion
return self._apply(spec)
@trivial_completion
- def add_rgw(self, spec) -> List[str]:
+ def add_rgw(self, spec: ServiceSpec) -> List[str]:
return self._add_daemon('rgw', spec, self.rgw_service.prepare_create, self.rgw_service.config)
@trivial_completion
- def apply_rgw(self, spec) -> str:
+ def apply_rgw(self, spec: ServiceSpec) -> str:
return self._apply(spec)
@trivial_completion
return self._add_daemon('iscsi', spec, self.iscsi_service.prepare_create, self.iscsi_service.config)
@trivial_completion
- def apply_iscsi(self, spec) -> str:
+ def apply_iscsi(self, spec: ServiceSpec) -> str:
return self._apply(spec)
@trivial_completion
- def add_rbd_mirror(self, spec) -> List[str]:
+ def add_rbd_mirror(self, spec: ServiceSpec) -> List[str]:
return self._add_daemon('rbd-mirror', spec, self.rbd_mirror_service.prepare_create)
@trivial_completion
- def apply_rbd_mirror(self, spec) -> str:
+ def apply_rbd_mirror(self, spec: ServiceSpec) -> str:
return self._apply(spec)
@trivial_completion
- def add_nfs(self, spec) -> List[str]:
+ def add_nfs(self, spec: ServiceSpec) -> List[str]:
return self._add_daemon('nfs', spec, self.nfs_service.prepare_create, self.nfs_service.config)
@trivial_completion
- def apply_nfs(self, spec) -> str:
+ def apply_nfs(self, spec: ServiceSpec) -> str:
return self._apply(spec)
def _get_dashboard_url(self):
return self.get('mgr_map').get('services', {}).get('dashboard', '')
@trivial_completion
- def add_prometheus(self, spec) -> List[str]:
+ def add_prometheus(self, spec: ServiceSpec) -> List[str]:
return self._add_daemon('prometheus', spec, self.prometheus_service.prepare_create)
@trivial_completion
- def apply_prometheus(self, spec) -> str:
+ def apply_prometheus(self, spec: ServiceSpec) -> str:
return self._apply(spec)
@trivial_completion
self.node_exporter_service.prepare_create)
@trivial_completion
- def apply_node_exporter(self, spec) -> str:
+ def apply_node_exporter(self, spec: ServiceSpec) -> str:
return self._apply(spec)
@trivial_completion
self.crash_service.prepare_create)
@trivial_completion
- def apply_crash(self, spec) -> str:
+ def apply_crash(self, spec: ServiceSpec) -> str:
return self._apply(spec)
@trivial_completion
self.cephadm_exporter_service.prepare_create)
@trivial_completion
- def apply_cephadm_exporter(self, spec) -> str:
+ def apply_cephadm_exporter(self, spec: ServiceSpec) -> str:
return self._apply(spec)
- def _get_container_image_info(self, image_name) -> ContainerInspectInfo:
+ def _get_container_image_info(self, image_name: str) -> ContainerInspectInfo:
# pick a random host...
host = None
for host_name in self.inventory.keys():
raise OrchestratorError(msg)
@trivial_completion
- def upgrade_check(self, image, version) -> str:
+ def upgrade_check(self, image: str, version: str) -> str:
if version:
target_name = self.container_image_base + ':v' + version
elif image:
image_info = self._get_container_image_info(target_name)
self.log.debug(f'image info {image} -> {image_info}')
- r = {
+ r: dict = {
'target_name': target_name,
'target_id': image_info.image_id,
'target_version': image_info.ceph_version,
return self.upgrade.upgrade_status()
@trivial_completion
- def upgrade_start(self, image, version) -> str:
+ def upgrade_start(self, image: str, version: str) -> str:
return self.upgrade.upgrade_start(image, version)
@trivial_completion
return "Scheduled OSD(s) for removal"
@trivial_completion
- def stop_remove_osds(self, osd_ids: List[str]):
+ def stop_remove_osds(self, osd_ids: List[str]) -> str:
"""
Stops a `removal` process for a List of OSDs.
This will revert their weight and remove it from the osds_to_remove queue
return "Stopped OSD(s) removal"
@trivial_completion
- def remove_osds_status(self):
+ def remove_osds_status(self) -> List[OSD]:
"""
The CLI call to retrieve an osd removal report
"""