def evaluate(self):
self.finalize(None)
+
def trivial_completion(f: Callable[..., T]) -> Callable[..., CephadmCompletion[T]]:
"""
Decorator to make CephadmCompletion methods return
self.registry_username: Optional[str] = None
self.registry_password: Optional[str] = None
- self._cons = {} # type: Dict[str, Tuple[remoto.backends.BaseConnection,remoto.backends.LegacyModuleExecute]]
-
+ self._cons: Dict[str, Tuple[remoto.backends.BaseConnection,
+ remoto.backends.LegacyModuleExecute]] = {}
self.notify('mon_map', None)
self.config_notify()
if h not in self.inventory:
self.cache.rm_host(h)
-
# in-memory only.
self.events = EventStore(self)
self.offline_hosts: Set[str] = set()
return f"Host {host} failed to login to {url} as {username} with given password"
return
-
def _check_host(self, host):
if host not in self.inventory:
return
]
if forcename:
if len([d for d in existing if d.daemon_id == forcename]):
- raise orchestrator.OrchestratorValidationError(f'name {daemon_type}.{forcename} already in use')
+ raise orchestrator.OrchestratorValidationError(
+ f'name {daemon_type}.{forcename} already in use')
return forcename
if '.' in host:
for _ in range(6))
if len([d for d in existing if d.daemon_id == name]):
if not suffix:
- raise orchestrator.OrchestratorValidationError(f'name {daemon_type}.{name} already in use')
+ raise orchestrator.OrchestratorValidationError(
+ f'name {daemon_type}.{name} already in use')
self.log.debug('name %s exists, trying again', name)
continue
return name
if host in self.offline_hosts:
self.offline_hosts.remove(host)
-
@staticmethod
def can_run():
if remoto is not None:
return True, ""
else:
return False, "loading remoto library:{}".format(
- remoto_import_error)
+ remoto_import_error)
def available(self):
"""
Does nothing, as completions are processed in another thread.
"""
if completions:
- self.log.debug("process: completions={0}".format(orchestrator.pretty_print(completions)))
+ self.log.debug("process: completions={0}".format(
+ orchestrator.pretty_print(completions)))
for p in completions:
p.evaluate()
return HandleCommandResult(stdout=ssh_config)
return HandleCommandResult(stdout=DEFAULT_SSH_CONFIG)
-
@orchestrator._cli_write_command(
'cephadm generate-key',
desc='Generate a cluster SSH key (if not present)')
host = self.cache.get_hosts()[0]
r = self._check_host(host)
if r is not None:
- #connection failed reset user
+ # connection failed reset user
self.set_store('ssh_user', current_user)
self._reconfig_ssh()
return -errno.EINVAL, '', 'ssh connection %s@%s failed' % (user, host)
# if password not given in command line, get it through file input
if not (url and username and password) and (inbuf is None or len(inbuf) == 0):
return -errno.EINVAL, "", ("Invalid arguments. Please provide arguments <url> <username> <password> "
- "or -i <login credentials json file>")
+ "or -i <login credentials json file>")
elif not (url and username and password):
login_info = json.loads(inbuf)
if "url" in login_info and "username" in login_info and "password" in login_info:
password = login_info["password"]
else:
return -errno.EINVAL, "", ("json provided for custom registry login did not include all necessary fields. "
- "Please setup json file as\n"
- "{\n"
- " \"url\": \"REGISTRY_URL\",\n"
- " \"username\": \"REGISTRY_USERNAME\",\n"
- " \"password\": \"REGISTRY_PASSWORD\"\n"
- "}\n")
+ "Please setup json file as\n"
+ "{\n"
+ " \"url\": \"REGISTRY_URL\",\n"
+ " \"username\": \"REGISTRY_USERNAME\",\n"
+ " \"password\": \"REGISTRY_PASSWORD\"\n"
+ "}\n")
# verify login info works by attempting login on random host
host = None
for host_name in self.inventory.keys():
except OrchestratorError as e:
self.log.exception(f"check-host failed for '{host}'")
return 1, '', ('check-host failed:\n' +
- f"Host '{host}' not found. Use 'ceph orch host ls' to see all managed hosts.")
+ f"Host '{host}' not found. Use 'ceph orch host ls' to see all managed hosts.")
# if we have an outstanding health alert for this host, give the
# serve thread a kick
if 'CEPHADM_HOST_CHECK_FAILED' in self.health_checks:
n = self.ssh_user + '@' + host
self.log.debug("Opening connection to {} with ssh options '{}'".format(
n, self._ssh_options))
- child_logger=self.log.getChild(n)
+ child_logger = self.log.getChild(n)
child_logger.setLevel('WARNING')
conn = remoto.Connection(
n,
raise RuntimeError("Executable '{}' not found on host '{}'".format(
executable, conn.hostname))
self.log.debug("Found executable '{}' at path '{}'".format(executable,
- executable_path))
+ executable_path))
return executable_path
@contextmanager
def _remote_connection(self,
host: str,
- addr: Optional[str]=None,
+ addr: Optional[str] = None,
) -> Iterator[Tuple["BaseConnection", Any]]:
if not addr and host in self.inventory:
addr = self.inventory.get_addr(host)
no_fsid: Optional[bool] = False,
error_ok: Optional[bool] = False,
image: Optional[str] = "",
- env_vars: Optional[List[str]]= None,
+ env_vars: Optional[List[str]] = None,
) -> Tuple[List[str], List[str], int]:
"""
Run cephadm on the remote host with the given command + args
host, remotes.PYTHONS, remotes.PATH))
try:
out, err, code = remoto.process.check(
- conn,
- [python, '-u'],
- stdin=script.encode('utf-8'))
+ conn,
+ [python, '-u'],
+ stdin=script.encode('utf-8'))
except RuntimeError as e:
self._reset_con(host)
if error_ok:
code, '\n'.join(err)))
return out, err, code
-
def _get_hosts(self, label: Optional[str] = '', as_hostspec: bool = False) -> List:
return list(self.inventory.filter_by_label(label=label, as_hostspec=as_hostspec))
if dd.hostname == hostname:
daemon_map[dd.daemon_type].append(dd.daemon_id)
- for daemon_type,daemon_ids in daemon_map.items():
+ for daemon_type, daemon_ids in daemon_map.items():
r = self.cephadm_services[daemon_type].ok_to_stop(daemon_ids)
if r.retval:
self.log.error(f'It is NOT safe to stop host {hostname}')
raise orchestrator.OrchestratorError(
- r.stderr,
- errno=r.retval)
+ r.stderr,
+ errno=r.retval)
msg = f'It is presumed safe to stop host {hostname}'
self.log.info(msg)
if self.cache.host_needs_registry_login(host) and self.registry_url:
self.log.debug(f"Logging `{host}` into custom registry")
- r = self._registry_login(host, self.registry_url, self.registry_username, self.registry_password)
+ r = self._registry_login(host, self.registry_url,
+ self.registry_username, self.registry_password)
if r:
bad_hosts.append(r)
if action == 'redeploy':
if self.daemon_is_self(daemon_type, daemon_id):
self.mgr_service.fail_over()
- return # unreachable.
+ return # unreachable.
# stop, recreate the container+unit, then restart
return self._create_daemon(daemon_spec)
elif action == 'reconfig':
})
@trivial_completion
- def daemon_action(self, action: str, daemon_name: str, image: Optional[str]=None) -> str:
+ def daemon_action(self, action: str, daemon_name: str, image: Optional[str] = None) -> str:
d = self.cache.get_daemon(daemon_name)
if action == 'redeploy' and self.daemon_is_self(d.daemon_type, d.daemon_id) \
osd_uuid_map: Optional[Dict[str, Any]] = None,
) -> str:
-
with set_exception_subject('service', orchestrator.DaemonDescription(
daemon_type=daemon_spec.daemon_type,
daemon_id=daemon_spec.daemon_id,
).service_id(), overwrite=True):
start_time = datetime.datetime.utcnow()
- cephadm_config, deps = self.cephadm_services[daemon_spec.daemon_type].generate_config(daemon_spec)
+ cephadm_config, deps = self.cephadm_services[daemon_spec.daemon_type].generate_config(
+ daemon_spec)
daemon_spec.extra_args.extend(['--config-json', '-'])
# TCP port to open in the host firewall
if daemon_spec.ports:
- daemon_spec.extra_args.extend(['--tcp-ports', ' '.join(map(str,daemon_spec.ports))])
+ daemon_spec.extra_args.extend(
+ ['--tcp-ports', ' '.join(map(str, daemon_spec.ports))])
# osd deployments needs an --osd-uuid arg
if daemon_spec.daemon_type == 'osd':
daemon_spec.extra_args.append('--allow-ptrace')
if self.cache.host_needs_registry_login(daemon_spec.host) and self.registry_url:
- self._registry_login(daemon_spec.host, self.registry_url, self.registry_username, self.registry_password)
+ self._registry_login(daemon_spec.host, self.registry_url,
+ self.registry_username, self.registry_password)
self.log.info('%s daemon %s on %s' % (
'Reconfiguring' if reconfig else 'Deploying',
if daemon_spec.daemon_type in ['grafana', 'iscsi', 'prometheus', 'alertmanager', 'nfs']:
self.requires_post_actions.add(daemon_spec.daemon_type)
self.cache.invalidate_host_daemons(daemon_spec.host)
- self.cache.update_daemon_config_deps(daemon_spec.host, daemon_spec.name(), deps, start_time)
+ self.cache.update_daemon_config_deps(
+ daemon_spec.host, daemon_spec.name(), deps, start_time)
self.cache.save_host(daemon_spec.host)
msg = "{} {} on host '{}'".format(
'Reconfigured' if reconfig else 'Deployed', daemon_spec.name(), daemon_spec.host)
self.events.for_daemon(daemon_spec.name(), OrchestratorEvent.INFO, msg)
else:
what = 'reconfigure' if reconfig else 'deploy'
- self.events.for_daemon(daemon_spec.name(), OrchestratorEvent.ERROR, f'Failed to {what}: {err}')
+ self.events.for_daemon(
+ daemon_spec.name(), OrchestratorEvent.ERROR, f'Failed to {what}: {err}')
return msg
@forall_hosts
hostname=host,
).service_id(), overwrite=True):
-
self.cephadm_services[daemon_type].pre_remove(daemon_id)
args = ['--name', name, '--force']
config_func(spec)
did_config = True
- daemon_spec = self.cephadm_services[daemon_type].make_daemon_spec(host, daemon_id, network, spec)
+ daemon_spec = self.cephadm_services[daemon_type].make_daemon_spec(
+ host, daemon_id, network, spec)
self.log.debug('Placing %s.%s on host %s' % (
daemon_type, daemon_id, host))
def _apply_all_services(self):
r = False
- specs = [] # type: List[ServiceSpec]
+ specs = [] # type: List[ServiceSpec]
for sn, spec in self.spec_store.specs.items():
specs.append(spec)
for spec in specs:
# These daemon types require additional configs after creation
if dd.daemon_type in ['grafana', 'iscsi', 'prometheus', 'alertmanager', 'nfs']:
daemons_post[dd.daemon_type].append(dd)
-
+
if self.cephadm_services[dd.daemon_type].get_active_daemon(
self.cache.get_daemons_by_service(dd.service_name())).daemon_id == dd.daemon_id:
dd.is_active = True
last_deps = []
action = self.cache.get_scheduled_daemon_action(dd.hostname, dd.name())
if not last_config:
- self.log.info('Reconfiguring %s (unknown last config time)...'% (
+ self.log.info('Reconfiguring %s (unknown last config time)...' % (
dd.name()))
action = 'reconfig'
elif last_deps != deps:
action = 'reconfig'
elif self.last_monmap and \
self.last_monmap > last_config and \
- dd.daemon_type in CEPH_TYPES:
+ dd.daemon_type in CEPH_TYPES:
self.log.info('Reconfiguring %s (monmap changed)...' % dd.name())
action = 'reconfig'
if action:
config_func(spec)
did_config = True
- daemon_spec = self.cephadm_services[daemon_type].make_daemon_spec(host, daemon_id, network, spec)
+ daemon_spec = self.cephadm_services[daemon_type].make_daemon_spec(
+ host, daemon_id, network, spec)
self.log.debug('Placing %s.%s on host %s' % (
daemon_type, daemon_id, host))
args.append(daemon_spec)
}
spec.placement = defaults[spec.service_type]
elif spec.service_type in ['mon', 'mgr'] and \
- spec.placement.count is not None and \
- spec.placement.count < 1:
+ spec.placement.count is not None and \
+ spec.placement.count < 1:
raise OrchestratorError('cannot scale %s service below 1' % (
spec.service_type))
if not host:
raise OrchestratorError('no hosts defined')
if self.cache.host_needs_registry_login(host) and self.registry_url:
- self._registry_login(host, self.registry_url, self.registry_username, self.registry_password)
+ self._registry_login(host, self.registry_url,
+ self.registry_username, self.registry_password)
out, err, code = self._run_cephadm(
host, '', 'pull', [],
image=image_name,